Simplify lookup sync da_checker oracle (#9428)

Implementing gloas lookup sync is currently incompatible with the `GossipBlockProcessResult` mechanism.

Today it's implemented such that if we receive a sucessful `GossipBlockProcessResult` we directly mark the lookup as Complete and delete it. In Gloas we can't delete a lookup after block import, as we may still have FULL child awaiting the payload.

IMO this `GossipBlockProcessResult` brings a lot of headache and edge cases that we can just live without. Also the `reset_request` business is nasty and can easily leave the lookup in a bad state.


  If we get rid of `GossipBlockProcessResult` we only pay the following performance penalty:

- Lookup is created exactly while the block's payload is being execution validated
- (new degradation) we download the block again
- send the block for processing but the duplicate cache prevents double execution

So in the worst case we spend a few KBs of extra download bandwidth. Remember each block is downloaded 8x times through gossip in the happy case.


Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>

Co-Authored-By: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
Lion - dapplion
2026-06-06 01:52:45 +02:00
committed by GitHub
parent 65f1a832e4
commit 8e4df4abab
7 changed files with 13 additions and 306 deletions

View File

@@ -922,14 +922,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
// If a block is in the da_checker, sync maybe awaiting for an event when block is finally
// imported. A block can become imported both after processing a block or data column. If
// importing a block results in `Imported`, notify. Do not notify of data column errors.
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: true,
});
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
trace!(
@@ -1354,16 +1346,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// contributing to the partial.
}
}
// If a block is in the da_checker, sync maybe awaiting for an event when block is finally
// imported. A block can become imported both after processing a block or data column. If a
// importing a block results in `Imported`, notify. Do not notify of data column errors.
if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: true,
});
}
}
async fn check_reconstruction_trigger(self: &Arc<Self>, slot: Slot, block_root: &Hash256) {
@@ -1898,11 +1880,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if let Err(e) = &result {
self.maybe_store_invalid_block(&invalid_block_storage, block_root, &block, e);
}
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))),
});
}
pub fn process_gossip_voluntary_exit(

View File

@@ -6,7 +6,7 @@ use crate::{
ChainSegmentProcessId, DuplicateCache, InvalidBlockStorage, NetworkBeaconProcessor,
},
service::NetworkMessage,
sync::{SyncMessage, manager::BlockProcessType},
sync::manager::BlockProcessType,
};
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::custody_context::NodeCustodyType;
@@ -76,7 +76,6 @@ struct TestRig {
beacon_processor_tx: BeaconProcessorSend<E>,
work_journal_rx: mpsc::Receiver<&'static str>,
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
duplicate_cache: DuplicateCache,
network_beacon_processor: Arc<NetworkBeaconProcessor<T>>,
_harness: BeaconChainHarness<T>,
@@ -270,7 +269,7 @@ impl TestRig {
beacon_processor_rx,
} = BeaconProcessorChannels::new(&beacon_processor_config);
let (sync_tx, sync_rx) = mpsc::unbounded_channel();
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
// Default metadata
let meta_data = if spec.is_peer_das_scheduled() {
@@ -375,7 +374,6 @@ impl TestRig {
beacon_processor_tx,
work_journal_rx,
network_rx,
sync_rx,
duplicate_cache,
network_beacon_processor,
_harness: harness,
@@ -844,45 +842,6 @@ impl TestRig {
Some(events)
}
}
/// Listen for sync messages and collect them for a specified duration or until reaching a count.
///
/// Returns None if no messages were received, or Some(Vec) containing the received messages.
pub async fn receive_sync_messages_with_timeout(
&mut self,
timeout: Duration,
count: Option<usize>,
) -> Option<Vec<SyncMessage<E>>> {
let mut events = vec![];
let timeout_future = tokio::time::sleep(timeout);
tokio::pin!(timeout_future);
loop {
// Break if we've received the requested count of messages
if let Some(target_count) = count
&& events.len() >= target_count
{
break;
}
tokio::select! {
_ = &mut timeout_future => break,
maybe_msg = self.sync_rx.recv() => {
match maybe_msg {
Some(msg) => events.push(msg),
None => break, // Channel closed
}
}
}
}
if events.is_empty() {
None
} else {
Some(events)
}
}
}
fn junk_peer_id() -> PeerId {
@@ -1862,65 +1821,6 @@ async fn test_blobs_by_root_post_fulu_should_return_empty() {
assert_eq!(0, actual_count);
}
/// Ensure that data column processing that results in block import sends a sync notification
#[tokio::test]
async fn test_data_column_import_notifies_sync() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
}
let mut rig = TestRig::new(SMALL_CHAIN).await;
let block_root = rig.next_block.canonical_root();
// Enqueue the block first to prepare for data column processing
rig.enqueue_gossip_block();
rig.assert_event_journal_completes(&[WorkType::GossipBlock])
.await;
rig.receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1))
.await
.expect("should receive sync message");
// Enqueue data columns which should trigger block import when complete
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
if num_data_columns > 0 {
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
// Verify block import succeeded
assert_eq!(
rig.head_root(),
block_root,
"block should be imported and become head"
);
// Check that sync was notified of the successful import
let sync_messages = rig
.receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1))
.await
.expect("should receive sync message");
// Verify we received the expected GossipBlockProcessResult message
assert_eq!(
sync_messages.len(),
1,
"should receive exactly one sync message"
);
match &sync_messages[0] {
SyncMessage::GossipBlockProcessResult {
block_root: msg_block_root,
imported,
} => {
assert_eq!(*msg_block_root, block_root, "block root should match");
assert!(*imported, "block should be marked as imported");
}
other => panic!("expected GossipBlockProcessResult, got {:?}", other),
}
}
}
#[tokio::test]
async fn test_data_columns_by_range_request_only_returns_requested_columns() {
if test_spec::<E>().fulu_fork_epoch.is_none() {

View File

@@ -482,39 +482,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx);
}
pub fn on_external_processing_result(
&mut self,
block_root: Hash256,
imported: bool,
cx: &mut SyncNetworkContext<T>,
) {
let Some((id, lookup)) = self
.single_block_lookups
.iter_mut()
.find(|(_, lookup)| lookup.is_for_block(block_root))
else {
// Ok to ignore gossip process events
return;
};
let lookup_result = if imported {
Ok(LookupResult::Completed)
} else {
// A lookup may be in the following state:
// - Block awaiting processing from a different source
// - Blobs downloaded processed, and inserted into the da_checker
//
// At this point the block fails processing (e.g. execution engine offline) and it is
// removed from the da_checker. Note that ALL components are removed from the da_checker
// so when we re-download and process the block we get the error
// MissingComponentsAfterAllProcessed and get stuck.
lookup.reset_requests();
lookup.continue_requests(cx)
};
let id = *id;
self.on_lookup_result(id, lookup_result, "external_processing_result", cx);
}
/// Makes progress on the immediate children of `block_root`
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self

View File

@@ -139,12 +139,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
/// Reset the status of all requests (used on block processing failure)
pub fn reset_requests(&mut self) {
self.block_request = BlockRequest::new();
self.data_request = DataRequest::WaitingForBlock;
}
/// Return the slot of this lookup's block if it's currently cached
pub fn peek_downloaded_block_slot(&self) -> Option<Slot> {
self.block_request

View File

@@ -182,11 +182,6 @@ pub enum SyncMessage<E: EthSpec> {
process_type: BlockProcessType,
result: BlockProcessingResult,
},
/// A gossip-received component has completed processing and the block may now be imported.
/// In Fulu this is sent after block or blob processing. In Gloas this is also sent after
/// data column or payload envelope processing triggers availability.
GossipBlockProcessResult { block_root: Hash256, imported: bool },
}
/// The type of processing specified for a received block.
@@ -907,14 +902,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => self
.block_lookups
.on_processing_result(process_type, result, &mut self.network),
SyncMessage::GossipBlockProcessResult {
block_root,
imported,
} => self.block_lookups.on_external_processing_result(
block_root,
imported,
&mut self.network,
),
SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
self.range_sync.handle_block_process_result(

View File

@@ -53,8 +53,8 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{Span, debug, debug_span, error, warn};
use types::{
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext,
Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
pub mod custody;
@@ -849,26 +849,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
// Block is known and currently processing. Imports from gossip and HTTP API insert the
// block in the da_cache. However, HTTP API is unable to notify sync when it completes
// block import. Returning `Pending` here will result in stuck lookups if the block is
// importing from sync.
BlockProcessStatus::NotValidated(_, source) => match source {
BlockImportSource::Gossip => {
// Lookup sync event safety: If the block is currently in the processing cache, we
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
// make progress on this lookup
return Ok(LookupRequestResult::Pending("block in processing cache"));
}
BlockImportSource::Lookup
| BlockImportSource::RangeSync
| BlockImportSource::HttpApi => {
// Lookup, RangeSync or HttpApi block import don't emit the GossipBlockProcessResult
// event. If a lookup happens to be created during block import from one of
// those sources just import the block twice. Otherwise the lookup will get
// stuck. Double imports are fine, they just waste resources.
}
},
// Block is known but processing. The block may turn out to be invalid, so we want sync to
// NOT mark the request as complete yet. The ideal flow would be:
// - Wait for processing to complete
// - Only if there is an error re-download and re-process
// But implementing this introduces complexity and the risk for the lookup to get stuck.
// Instead we always re-download the block eagerly and de-duplicate the processing. So in
// the happy case we just download the block again if the lookup is created while execution
// processing the block.
BlockProcessStatus::NotValidated(..) => {}
// Block is fully validated. If it's not yet imported it's waiting for missing block
// components. Consider this request completed and do nothing.
BlockProcessStatus::ExecutionValidated(block) => {

View File

@@ -1235,12 +1235,6 @@ impl TestRig {
self.assert_empty_network();
}
fn assert_pending_lookup_sync(&self) {
assert!(self.created_lookups() > 0, "no created lookups");
assert_eq!(self.dropped_lookups(), 0, "some dropped lookups");
assert_eq!(self.completed_lookups(), 0, "some completed lookups");
}
/// Assert there is at least one range sync chain created and that all sync chains completed
pub(super) fn assert_successful_range_sync(&self) {
assert!(
@@ -1330,15 +1324,6 @@ impl TestRig {
genesis_fork().fulu_enabled().then(Self::default)
}
fn new_after_deneb_before_fulu() -> Option<Self> {
let fork = genesis_fork();
if fork.deneb_enabled() && !fork.fulu_enabled() {
Some(Self::default())
} else {
None
}
}
pub fn new_fulu_peer_test(fulu_test_type: FuluTestType) -> Option<Self> {
genesis_fork().fulu_enabled().then(|| {
Self::new(TestRigConfig {
@@ -1673,56 +1658,6 @@ impl TestRig {
}
}
fn insert_block_to_da_checker_as_pre_execution(&mut self, block: Arc<SignedBeaconBlock<E>>) {
self.log(&format!(
"Inserting block to availability_cache as pre_execution_block {:?}",
block.canonical_root()
));
self.harness
.chain
.data_availability_checker
.put_pre_execution_block(block.canonical_root(), block, BlockImportSource::Gossip)
.unwrap();
}
fn simulate_block_gossip_processing_becomes_invalid(&mut self, block_root: Hash256) {
self.log(&format!(
"Marking block {block_root:?} in da_checker as execution error"
));
self.harness
.chain
.data_availability_checker
.remove_block_on_execution_error(&block_root);
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: false,
});
}
async fn simulate_block_gossip_processing_becomes_valid(
&mut self,
block: Arc<SignedBeaconBlock<E>>,
) {
let block_root = block.canonical_root();
match self.import_block_to_da_checker(block).await {
AvailabilityProcessingStatus::Imported(block_root) => {
self.log(&format!(
"insert block to da_checker and it imported {block_root:?}"
));
}
AvailabilityProcessingStatus::MissingComponents(_, _) => {
panic!("block not imported after adding to da_checker");
}
}
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: false,
});
}
fn requests_count(&self) -> HashMap<&'static str, usize> {
let mut requests_count = HashMap::new();
for (request, _) in &self.requests {
@@ -2294,48 +2229,6 @@ async fn block_in_da_checker_skips_download() {
);
}
#[tokio::test]
async fn block_in_processing_cache_becomes_invalid() {
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
return;
};
r.build_chain(1).await;
let block = r.block_at_slot(1);
r.insert_block_to_da_checker_as_pre_execution(block.clone());
r.trigger_with_last_block();
r.simulate(SimulateConfig::happy_path()).await;
r.assert_pending_lookup_sync();
// Here the only active lookup is waiting for the block to finish processing
// Simulate invalid block, removing it from processing cache
r.simulate_block_gossip_processing_becomes_invalid(block.canonical_root());
// Should download block, then issue blobs request
r.simulate(SimulateConfig::happy_path()).await;
r.assert_successful_lookup_sync();
}
#[tokio::test]
async fn block_in_processing_cache_becomes_valid_imported() {
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
return;
};
r.build_chain(1).await;
let block = r.block_at_slot(1);
r.insert_block_to_da_checker_as_pre_execution(block.clone());
r.trigger_with_last_block();
r.simulate(SimulateConfig::happy_path()).await;
r.assert_pending_lookup_sync();
// Here the only active lookup is waiting for the block to finish processing
// Resolve the block from processing step
r.simulate_block_gossip_processing_becomes_valid(block)
.await;
// Should not trigger block or blob request
r.assert_empty_network();
// Resolve blob and expect lookup completed
r.assert_no_active_lookups();
}
macro_rules! fulu_peer_matrix_tests {
(
[$($name:ident => $variant:expr),+ $(,)?]