This commit is contained in:
dapplion
2026-05-19 03:27:15 -06:00
parent e4f40836d8
commit 7739c91a3a

View File

@@ -238,6 +238,7 @@ impl<E: EthSpec> BlockRequest<E> {
#[derive(Debug)]
struct DataRequest<E: EthSpec> {
peers: PeerSet,
slot: Slot,
state: DataRequestState<E>,
}
@@ -251,10 +252,7 @@ enum DataRequestState<E: EthSpec> {
peer_group: PeerGroup,
},
/// Data sent for processing, awaiting result
Processing {
kind: DataDownloadKind,
peer_group: PeerGroup,
},
Processing { peer_group: PeerGroup },
/// Data processing complete (or no data needed)
Complete,
}
@@ -267,16 +265,6 @@ impl<E: EthSpec> DataRequestState<E> {
_ => false,
}
}
fn peer_group(&self) -> Option<&PeerGroup> {
match self {
Self::Downloading(dl) => dl.peek_downloaded_peer_group(),
Self::Downloaded { peer_group, .. } | Self::Processing { peer_group, .. } => {
Some(peer_group)
}
Self::Complete => None,
}
}
}
/// Fork-dependent data download state
@@ -373,17 +361,6 @@ enum DownloadedData<E: EthSpec> {
Columns(DataColumnSidecarList<E>),
}
impl<E: EthSpec> DownloadedData<E> {
fn kind(&self) -> DataDownloadKind {
match self {
DownloadedData::Blobs { expected_blobs, .. } => DataDownloadKind::Blobs {
expected_blobs: *expected_blobs,
},
DownloadedData::Columns(_) => DataDownloadKind::Columns,
}
}
}
/// Enough info to reconstruct a fresh `DataDownload` when we need to retry data download
/// after a processing failure. We can't call `create_data_request` again from here because
/// we're past the `WaitingForBlock` state and don't have the `SyncNetworkContext` (and
@@ -395,26 +372,6 @@ enum DataDownloadKind {
Columns,
}
impl DataDownloadKind {
fn into_fresh_download<E: EthSpec>(
self,
block_root: Hash256,
failed_processing: u8,
) -> DataDownload<E> {
match self {
DataDownloadKind::Blobs { expected_blobs } => DataDownload::Blobs {
block_root,
expected_blobs,
state: SingleLookupRequestState::new_with_processing_failures(failed_processing),
},
DataDownloadKind::Columns => DataDownload::Columns {
block_root,
state: SingleLookupRequestState::new_with_processing_failures(failed_processing),
},
}
}
}
// === Payload request: WaitingForBlock → Downloading → Downloaded → Processing → Complete ===
#[derive(Debug)]
@@ -427,7 +384,6 @@ struct PayloadRequest<E: EthSpec> {
#[educe(Debug)]
enum PayloadRequestState<E: EthSpec> {
Downloading {
block_root: Hash256,
state: SingleLookupRequestState<Arc<SignedExecutionPayloadEnvelope<E>>>,
},
Downloaded {
@@ -496,7 +452,7 @@ impl<E: EthSpec> DataRequestState<E> {
impl<E: EthSpec> PayloadRequestState<E> {
/// Create payload request based on the downloaded block's content and fork.
fn new(slot: Slot, block_root: Hash256, spec: &ChainSpec) -> Self {
fn new(slot: Slot, spec: &ChainSpec) -> Self {
let block_fork = spec.fork_name_at_slot::<E>(slot);
match block_fork {
@@ -508,7 +464,6 @@ impl<E: EthSpec> PayloadRequestState<E> {
| ForkName::Electra
| ForkName::Fulu => Self::Complete,
ForkName::Gloas => Self::Downloading {
block_root,
state: SingleLookupRequestState::new(),
},
}
@@ -610,31 +565,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
/// Returns whether this lookup's block was produced with a published payload envelope
/// ("full") as seen by the given child's bid reference. Always `false` pre-Gloas: the
/// empty/full distinction only exists post-Gloas. The child's bid carries the parent
/// execution hash, which we match against this block's bid `block_hash`.
pub fn is_full_payload(&self, awaiting_parent: &AwaitingParent) -> bool {
let Some(parent_hash) = awaiting_parent.parent_hash() else {
return false;
};
let Some(block) = self.block_request.peek_block() else {
// Block not yet downloaded — we don't know what peers can serve the
// parent envelope/data yet. Treat conservatively as "not full".
// TODO(gloas): cache peers in a deferred set instead of dropping them
// so we can assign them to data/payload streams once the block arrives.
debug!(
block_root = ?self.block_root,
"is_full_payload called before block downloaded, returning false"
);
return false;
};
match block.message().body().signed_execution_payload_bid() {
Ok(payload) => payload.message.block_hash == parent_hash,
Err(_) => false,
}
}
/// Reset the status of all requests (used on block processing failure)
pub fn reset_requests(&mut self) {
// Increment processing failure counter (we're resetting due to processing error)
@@ -812,6 +742,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// === Data request ===
loop {
match &mut self.data_request {
// None = waiting for block
None => {
// Prefer a block downloaded by this lookup. Otherwise fall back to the
// chain's processing-status cache: the block may already be in the
@@ -834,6 +765,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.map_err(LookupRequestError::InternalError)?;
self.data_request = Some(DataRequest {
peers,
slot: block.slot(),
state: DataRequestState::new(
block.slot(),
self.block_root,
@@ -886,9 +818,8 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.map_err(LookupRequestError::SendFailedProcessor)?;
}
}
let kind = data.kind();
let peer_group = peer_group.clone();
request.state = DataRequestState::Processing { kind, peer_group };
request.state = DataRequestState::Processing { peer_group };
// Processing needs an async trigger.
break;
}
@@ -920,11 +851,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.map_err(LookupRequestError::InternalError)?;
self.payload_request = Some(PayloadRequest {
peers,
state: PayloadRequestState::new(
block.slot(),
self.block_root,
cx.spec(),
),
state: PayloadRequestState::new(block.slot(), cx.spec()),
});
} else {
break;
@@ -998,10 +925,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
Ok(LookupResult::Pending)
}
fn get_peer_set(&self) -> PeerSet {
todo!();
}
fn get_data_peers<E: EthSpec>(
&self,
slot: Slot,
@@ -1077,7 +1000,8 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// Data processing failed — bump the shared processing-failure counter so the
// retry is bounded against `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS`, then reset.
self.failed_processing = self.failed_processing.saturating_add(1);
self.reset_data_request();
// TODO(gloas-sync): Should this persist some state?
self.data_request = None;
self.continue_requests(cx)
}
}
@@ -1107,7 +1031,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// Bump the shared processing-failure counter to bound retries.
self.failed_processing = self.failed_processing.saturating_add(1);
request.state = PayloadRequestState::Downloading {
block_root: self.block_root,
state: SingleLookupRequestState::new_with_processing_failures(
self.failed_processing,
),
@@ -1116,28 +1039,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
/// Reset data request to a fresh download, preserving the download kind.
fn reset_data_request(&mut self) {
if let Some(request) = &mut self.data_request {
let kind = match &request.state {
DataRequestState::Downloading(dl) => match dl {
DataDownload::Blobs { expected_blobs, .. } => Some(DataDownloadKind::Blobs {
expected_blobs: *expected_blobs,
}),
DataDownload::Columns { .. } => Some(DataDownloadKind::Columns),
},
DataRequestState::Downloaded { data, .. } => Some(data.kind()),
DataRequestState::Processing { kind, .. } => Some(*kind),
DataRequestState::Complete => None,
};
if let Some(kind) = kind {
request.state = DataRequestState::Downloading(
kind.into_fresh_download(self.block_root, self.failed_processing),
);
}
}
}
// -- Download response handlers --
/// Handle a block download response. Updates download state and advances the lookup.