Add allow reprocess flag

This commit is contained in:
Eitan Seri-Levi
2026-05-24 10:15:13 +03:00
parent 623c8f4617
commit 255773834b
4 changed files with 35 additions and 26 deletions

View File

@@ -649,6 +649,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
subnet_id: DataColumnSubnetId,
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
seen_duration: Duration,
allow_reprocess: bool,
) {
let slot = column_sidecar.slot();
let block_root = column_sidecar.block_root();
@@ -738,33 +739,37 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
MessageAcceptance::Ignore,
);
// Queue the column for reprocessing when the block arrives.
let processor = self.clone();
let reprocess_msg =
ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
beacon_block_root: unknown_block_root,
process_fn: Box::new(move || {
let _ = processor.send_gossip_data_column_sidecar(
message_id,
peer_id,
subnet_id,
column_sidecar,
seen_duration,
);
}),
});
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(reprocess_msg),
})
.is_err()
{
debug!(
%unknown_block_root,
"Failed to queue data column for reprocessing"
if allow_reprocess {
// Queue the column for reprocessing when the block arrives.
let processor = self.clone();
let reprocess_msg = ReprocessQueueMessage::UnknownBlockDataColumn(
QueuedGossipDataColumn {
beacon_block_root: unknown_block_root,
process_fn: Box::new(move || {
let _ = processor.send_gossip_data_column_sidecar(
message_id,
peer_id,
subnet_id,
column_sidecar,
seen_duration,
false, // Do not reprocess this message again.
);
}),
},
);
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(reprocess_msg),
})
.is_err()
{
debug!(
%unknown_block_root,
"Failed to queue data column for reprocessing"
);
}
}
}
GossipDataColumnError::InvalidVariant

View File

@@ -231,6 +231,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
subnet_id: DataColumnSubnetId,
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
seen_timestamp: Duration,
allow_reprocess: bool,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
@@ -241,6 +242,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
subnet_id,
column_sidecar,
seen_timestamp,
allow_reprocess,
)
.await
};

View File

@@ -435,6 +435,7 @@ impl TestRig {
DataColumnSubnetId::from_column_index(*data_column.index(), &self.chain.spec),
data_column.clone(),
Duration::from_secs(0),
true,
)
.unwrap();
}

View File

@@ -435,6 +435,7 @@ impl<T: BeaconChainTypes> Router<T> {
subnet_id,
column_sidecar,
seen_timestamp,
true,
),
)
}