Address more review comments

This commit is contained in:
Pawan Dhananjay
2026-06-03 14:43:41 -07:00
parent 61f38d9365
commit 67a81af225
4 changed files with 57 additions and 90 deletions

View File

@@ -236,28 +236,6 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
return None;
}
// If we have payload envelopes, use the Gloas coupling path
if let Some(ByRangeRequest::Complete(envelopes)) = &self.payloads_request {
let data_columns = match &mut self.block_data_request {
RangeBlockDataRequest::DataColumns { requests, .. } => {
let mut cols = vec![];
for req in requests.values() {
let Some(data) = req.to_finished() else {
return None;
};
cols.extend(data.clone());
}
cols
}
_ => vec![],
};
return Some(Self::responses_gloas(
blocks.to_vec(),
envelopes.clone(),
data_columns,
));
}
// Increment the attempt once this function returns the response or errors
match &mut self.block_data_request {
RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs(
@@ -304,6 +282,12 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
}
let payload_envelopes = self.payloads_request.as_ref().and_then(|request| {
request
.to_finished()
.map(|payload_envelopes| payload_envelopes.to_vec())
});
let resp = Self::responses_with_custody_columns(
blocks.to_vec(),
data_columns,
@@ -312,6 +296,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
*attempt,
da_checker,
spec,
payload_envelopes,
);
if let Err(CouplingError::DataColumnPeerFailure {
@@ -402,6 +387,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Ok(responses)
}
#[allow(clippy::too_many_arguments)]
fn responses_with_custody_columns<T>(
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
data_columns: DataColumnSidecarList<E>,
@@ -410,10 +396,19 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
attempt: usize,
da_checker: Arc<DataAvailabilityChecker<T>>,
spec: Arc<ChainSpec>,
payload_envelopes: Option<Vec<Arc<SignedExecutionPayloadEnvelope<E>>>>,
) -> Result<Vec<RangeSyncBlock<E>>, CouplingError>
where
T: BeaconChainTypes<EthSpec = E>,
{
// Index envelopes by beacon_block_root for correct coupling.
let mut envelopes_by_block_root = payload_envelopes.map(|envelopes| {
envelopes
.into_iter()
.map(|e| (e.beacon_block_root(), e))
.collect::<HashMap<_, _>>()
});
// Group data columns by block_root and index
let mut data_columns_by_block =
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
@@ -443,7 +438,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
let exceeded_retries = attempt >= MAX_COLUMN_RETRIES;
for block in blocks {
let block_root = get_block_root(&block);
range_sync_blocks.push(if block.num_expected_blobs() > 0 {
let custody_columns = if block.num_expected_blobs() > 0 {
let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root)
else {
let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect();
@@ -451,7 +446,6 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
error: format!("No columns for block {block_root:?} with data"),
faulty_peers: responsible_peers,
exceeded_retries,
});
};
@@ -465,16 +459,21 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column));
} else {
let Some(responsible_peer) = column_to_peer.get(index) else {
return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index)));
return Err(CouplingError::InternalError(format!(
"Internal error, no request made for column {}",
index
)));
};
naughty_peers.push((*index, *responsible_peer));
}
}
if !naughty_peers.is_empty() {
return Err(CouplingError::DataColumnPeerFailure {
error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"),
error: format!(
"Peers did not return column for block_root {block_root:?} {naughty_peers:?}"
),
faulty_peers: naughty_peers,
exceeded_retries
exceeded_retries,
});
}
@@ -489,15 +488,31 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
);
}
let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::<Vec<_>>());
RangeSyncBlock::new(block, block_data, &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
custody_columns
.iter()
.map(|c| c.as_data_column().clone())
.collect::<Vec<_>>()
} else {
// Block has no data, expects zero columns
vec![]
};
let range_sync_block = if let Some(envelopes_by_block_root) =
envelopes_by_block_root.as_mut()
{
let envelope = envelopes_by_block_root.remove(&block_root);
let available_envelope =
envelope.map(|env| Box::new(AvailableEnvelope::new(env, custody_columns)));
RangeSyncBlock::new_gloas(block, available_envelope)
.map_err(CouplingError::EnvelopePeerFailure)?
} else if custody_columns.is_empty() {
RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
});
} else {
let block_data = AvailableBlockData::new_with_data_columns(custody_columns);
RangeSyncBlock::new(block, block_data, &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
};
range_sync_blocks.push(range_sync_block);
}
// Assert that there are no columns left for other blocks
@@ -508,50 +523,10 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
debug!(?remaining_roots, "Not all columns consumed for block");
}
Ok(range_sync_blocks)
}
fn responses_gloas(
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
envelopes: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
data_columns: DataColumnSidecarList<E>,
) -> Result<Vec<RangeSyncBlock<E>>, CouplingError> {
// Index envelopes by beacon_block_root for correct coupling
let mut envelopes_by_block_root: HashMap<_, _> = envelopes
.into_iter()
.map(|e| (e.beacon_block_root(), e))
.collect();
// Group data columns by block_root
let mut data_columns_by_block = HashMap::<Hash256, Vec<Arc<DataColumnSidecar<E>>>>::new();
for column in data_columns {
let block_root = column.block_root();
data_columns_by_block
.entry(block_root)
.or_default()
.push(column);
}
let mut range_sync_blocks = Vec::with_capacity(blocks.len());
for block in blocks {
let block_root = get_block_root(&block);
let envelope = envelopes_by_block_root.remove(&block_root);
let columns = data_columns_by_block
.remove(&block_root)
.unwrap_or_default();
let available_envelope =
envelope.map(|env| Box::new(AvailableEnvelope::new(env, columns)));
range_sync_blocks.push(
RangeSyncBlock::new_gloas(block, available_envelope)
.map_err(CouplingError::EnvelopePeerFailure)?,
);
}
// Recoverable error, log and continue
if !envelopes_by_block_root.is_empty() {
if let Some(envelopes_by_block_root) = envelopes_by_block_root
&& !envelopes_by_block_root.is_empty()
{
tracing::warn!("Peer returned extra envelopes not matching any block");
}

View File

@@ -1433,8 +1433,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
if self
.chain
.data_availability_checker
.envelopes_required_for_epoch(epoch)
.spec
.gloas_fork_epoch
.is_some_and(|gloas_epoch| epoch >= gloas_epoch)
{
ByRangeRequestType::BlocksAndEnvelopesAndColumns
} else if self