Merge remote-tracking branch 'sigp/unstable' into gloas-lookup-sync-fixes

This commit is contained in:
dapplion
2026-06-06 11:32:38 +02:00
42 changed files with 1317 additions and 845 deletions

View File

@@ -587,68 +587,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.on_lookup_result(id, lookup_result, "processing_result", cx);
}
pub fn on_external_processing_result(
&mut self,
block_root: Hash256,
imported: bool,
cx: &mut SyncNetworkContext<T>,
) {
let Some(id) = self
.single_block_lookups
.iter()
.find(|(_, lookup)| lookup.is_for_block(block_root))
.map(|(id, _)| *id)
else {
// Ok to ignore gossip process events
return;
};
if imported {
// The block is imported into fork choice. Unblock its children, and complete this
// lookup unless a FULL Gloas child still awaits its payload (post-Gloas the payload
// envelope arrives separately from the block).
let bid_block_hash = self
.single_block_lookups
.get(&id)
.and_then(|lookup| lookup.peek_downloaded_bid_block_hash());
let import_action = match bid_block_hash {
Some(bid_block_hash) => ImportedAction::GloasBlockComplete {
block_root,
bid_block_hash,
},
None => ImportedAction::LookupComplete { block_root },
};
self.continue_child_lookups(import_action, cx);
if !self.has_any_awaiting_children(block_root) {
self.single_block_lookups.remove(&id);
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
self.metrics.completed_lookups += 1;
debug!(
?block_root,
id, "Dropping completed lookup (external import)"
);
}
self.update_metrics();
} else {
// A lookup may be in the following state:
// - Block awaiting processing from a different source
// - Blobs downloaded processed, and inserted into the da_checker
//
// At this point the block fails processing (e.g. execution engine offline) and it is
// removed from the da_checker. Note that ALL components are removed from the da_checker
// so when we re-download and process the block we get the error
// MissingComponentsAfterAllProcessed and get stuck.
let result = {
let Some(lookup) = self.single_block_lookups.get_mut(&id) else {
return;
};
lookup.reset_requests();
lookup.continue_requests(cx)
};
self.on_lookup_result(id, result, "external_processing_result", cx);
}
}
pub fn has_any_awaiting_children(&self, block_root: Hash256) -> bool {
self.single_block_lookups
.iter()

View File

@@ -243,13 +243,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
/// Reset the status of all requests (used on block processing failure)
pub fn reset_requests(&mut self) {
self.block_request = BlockRequest::new();
self.data_request = DataRequest::WaitingForBlock;
self.payload_request = PayloadRequest::WaitingForBlock;
}
/// Return the slot of this lookup's block if it's currently cached
pub fn peek_downloaded_block_slot(&self) -> Option<Slot> {
self.block_request

View File

@@ -182,11 +182,6 @@ pub enum SyncMessage<E: EthSpec> {
process_type: BlockProcessType,
result: BlockProcessingResult,
},
/// A gossip-received component has completed processing and the block may now be imported.
/// In Fulu this is sent after block or blob processing. In Gloas this is also sent after
/// data column or payload envelope processing triggers availability.
GossipBlockProcessResult { block_root: Hash256, imported: bool },
}
/// The type of processing specified for a received block.
@@ -914,14 +909,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => self
.block_lookups
.on_processing_result(process_type, result, &mut self.network),
SyncMessage::GossipBlockProcessResult {
block_root,
imported,
} => self.block_lookups.on_external_processing_result(
block_root,
imported,
&mut self.network,
),
SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
self.range_sync.handle_block_process_result(

View File

@@ -53,9 +53,8 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{Span, debug, debug_span, error, warn};
use types::{
BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar,
DataColumnSidecarList, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
pub mod custody;
@@ -855,26 +854,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
// Block is known and currently processing. Imports from gossip and HTTP API insert the
// block in the da_cache. However, HTTP API is unable to notify sync when it completes
// block import. Returning `Pending` here will result in stuck lookups if the block is
// importing from sync.
BlockProcessStatus::NotValidated(_, source) => match source {
BlockImportSource::Gossip => {
// Lookup sync event safety: If the block is currently in the processing cache, we
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
// make progress on this lookup
return Ok(LookupRequestResult::Pending("block in processing cache"));
}
BlockImportSource::Lookup
| BlockImportSource::RangeSync
| BlockImportSource::HttpApi => {
// Lookup, RangeSync or HttpApi block import don't emit the GossipBlockProcessResult
// event. If a lookup happens to be created during block import from one of
// those sources just import the block twice. Otherwise the lookup will get
// stuck. Double imports are fine, they just waste resources.
}
},
// Block is known but processing. The block may turn out to be invalid, so we want sync to
// NOT mark the request as complete yet. The ideal flow would be:
// - Wait for processing to complete
// - Only if there is an error re-download and re-process
// But implementing this introduces complexity and the risk for the lookup to get stuck.
// Instead we always re-download the block eagerly and de-duplicate the processing. So in
// the happy case we just download the block again if the lookup is created while execution
// processing the block.
BlockProcessStatus::NotValidated(..) => {}
// Block is fully validated. If it's not yet imported it's waiting for missing block
// components. Consider this request completed and do nothing.
BlockProcessStatus::ExecutionValidated(block) => {

View File

@@ -1470,12 +1470,6 @@ impl TestRig {
self.assert_empty_network();
}
fn assert_pending_lookup_sync(&self) {
assert!(self.created_lookups() > 0, "no created lookups");
assert_eq!(self.dropped_lookups(), 0, "some dropped lookups");
assert_eq!(self.completed_lookups(), 0, "some completed lookups");
}
/// Assert there is at least one range sync chain created and that all sync chains completed
pub(super) fn assert_successful_range_sync(&self) {
assert!(
@@ -1565,15 +1559,6 @@ impl TestRig {
genesis_fork().fulu_enabled().then(Self::default)
}
fn new_after_deneb_before_fulu() -> Option<Self> {
let fork = genesis_fork();
if fork.deneb_enabled() && !fork.fulu_enabled() {
Some(Self::default())
} else {
None
}
}
pub fn new_fulu_peer_test(fulu_test_type: FuluTestType) -> Option<Self> {
genesis_fork().fulu_enabled().then(|| {
Self::new(TestRigConfig {
@@ -1912,56 +1897,6 @@ impl TestRig {
}
}
fn insert_block_to_da_checker_as_pre_execution(&mut self, block: Arc<SignedBeaconBlock<E>>) {
self.log(&format!(
"Inserting block to availability_cache as pre_execution_block {:?}",
block.canonical_root()
));
self.harness
.chain
.data_availability_checker
.put_pre_execution_block(block.canonical_root(), block, BlockImportSource::Gossip)
.unwrap();
}
fn simulate_block_gossip_processing_becomes_invalid(&mut self, block_root: Hash256) {
self.log(&format!(
"Marking block {block_root:?} in da_checker as execution error"
));
self.harness
.chain
.data_availability_checker
.remove_block_on_execution_error(&block_root);
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: false,
});
}
async fn simulate_block_gossip_processing_becomes_valid(
&mut self,
block: Arc<SignedBeaconBlock<E>>,
) {
let block_root = block.canonical_root();
match self.import_block_to_da_checker(block).await {
AvailabilityProcessingStatus::Imported(block_root) => {
self.log(&format!(
"insert block to da_checker and it imported {block_root:?}"
));
}
AvailabilityProcessingStatus::MissingComponents(_, _) => {
panic!("block not imported after adding to da_checker");
}
}
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: false,
});
}
fn requests_count(&self) -> HashMap<&'static str, usize> {
let mut requests_count = HashMap::new();
for (request, _) in &self.requests {
@@ -2584,48 +2519,6 @@ async fn block_in_da_checker_skips_download() {
);
}
#[tokio::test]
async fn block_in_processing_cache_becomes_invalid() {
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
return;
};
r.build_chain(1).await;
let block = r.block_at_slot(1);
r.insert_block_to_da_checker_as_pre_execution(block.clone());
r.trigger_with_last_block();
r.simulate(SimulateConfig::happy_path()).await;
r.assert_pending_lookup_sync();
// Here the only active lookup is waiting for the block to finish processing
// Simulate invalid block, removing it from processing cache
r.simulate_block_gossip_processing_becomes_invalid(block.canonical_root());
// Should download block, then issue blobs request
r.simulate(SimulateConfig::happy_path()).await;
r.assert_successful_lookup_sync();
}
#[tokio::test]
async fn block_in_processing_cache_becomes_valid_imported() {
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
return;
};
r.build_chain(1).await;
let block = r.block_at_slot(1);
r.insert_block_to_da_checker_as_pre_execution(block.clone());
r.trigger_with_last_block();
r.simulate(SimulateConfig::happy_path()).await;
r.assert_pending_lookup_sync();
// Here the only active lookup is waiting for the block to finish processing
// Resolve the block from processing step
r.simulate_block_gossip_processing_becomes_valid(block)
.await;
// Should not trigger block or blob request
r.assert_empty_network();
// Resolve blob and expect lookup completed
r.assert_no_active_lookups();
}
macro_rules! fulu_peer_matrix_tests {
(
[$($name:ident => $variant:expr),+ $(,)?]