Enable reconstruction for nodes custodying more than 50% of columns and instrument tracing (#8052)

Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>

Co-Authored-By: Jimmy Chen <jimmy@sigmaprime.io>
This commit is contained in:
Jimmy Chen
2025-09-16 18:17:43 +10:00
committed by GitHub
parent 242bdfcf12
commit 3de646c8b3
7 changed files with 76 additions and 72 deletions

View File

@@ -34,7 +34,6 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc::error::TrySendError;
use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
use types::{
Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, DataColumnSidecar,
@@ -1054,36 +1053,43 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Processed data column, waiting for other components"
);
// Instead of triggering reconstruction immediately, schedule it to be run. If
// another column arrives it either completes availability or pushes
// reconstruction back a bit.
let cloned_self = Arc::clone(self);
let block_root = *block_root;
let send_result = self.beacon_processor_send.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
QueuedColumnReconstruction {
block_root,
slot: *slot,
process_fn: Box::pin(async move {
cloned_self
.attempt_data_column_reconstruction(block_root)
.await;
}),
},
)),
});
if let Err(TrySendError::Full(WorkEvent {
work:
Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
reconstruction,
)),
..
})) = send_result
if self
.chain
.data_availability_checker
.custody_context()
.should_attempt_reconstruction(
slot.epoch(T::EthSpec::slots_per_epoch()),
&self.chain.spec,
)
{
warn!("Unable to send reconstruction to reprocessing");
// Execute it immediately instead.
reconstruction.process_fn.await;
// Instead of triggering reconstruction immediately, schedule it to be run. If
// another column arrives, it either completes availability or pushes
// reconstruction back a bit.
let cloned_self = Arc::clone(self);
let block_root = *block_root;
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(
ReprocessQueueMessage::DelayColumnReconstruction(
QueuedColumnReconstruction {
block_root,
slot: *slot,
process_fn: Box::pin(async move {
cloned_self
.attempt_data_column_reconstruction(block_root)
.await;
}),
},
),
),
})
.is_err()
{
warn!("Unable to send reconstruction to reprocessing");
}
}
}
},

View File

@@ -28,7 +28,7 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, error::TrySendError};
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, instrument, trace, warn};
use types::*;
pub use sync_methods::ChainSegmentProcessId;
@@ -825,30 +825,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
/// Attempt to reconstruct all data columns if the following conditions satisfies:
/// - Our custody requirement is all columns
/// - We have >= 50% of columns, but not all columns
///
/// Returns `Some(AvailabilityProcessingStatus)` if reconstruction is successfully performed,
/// otherwise returns `None`.
///
/// The `publish_columns` parameter controls whether reconstructed columns should be published
/// to the gossip network.
async fn attempt_data_column_reconstruction(
self: &Arc<Self>,
block_root: Hash256,
) -> Option<AvailabilityProcessingStatus> {
// Only supernodes attempt reconstruction
if !self
.chain
.data_availability_checker
.custody_context()
.current_is_supernode
{
return None;
}
/// Attempts to reconstruct all data columns if the conditions checked in
/// [`DataAvailabilityCheckerInner::check_and_set_reconstruction_started`] are satisfied.
#[instrument(level = "debug", skip_all, fields(?block_root))]
async fn attempt_data_column_reconstruction(self: &Arc<Self>, block_root: Hash256) {
let result = self.chain.reconstruct_data_columns(block_root).await;
match result {
Ok(Some((availability_processing_status, data_columns_to_publish))) => {
self.publish_data_columns_gradually(data_columns_to_publish, block_root);
@@ -864,21 +846,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
AvailabilityProcessingStatus::MissingComponents(_, _) => {
debug!(
result = "imported all custody columns",
block_hash = %block_root,
%block_root,
"Block components still missing block after reconstruction"
);
}
}
Some(availability_processing_status)
}
Ok(None) => {
// reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric
trace!(
block_hash = %block_root,
%block_root,
"Reconstruction not required for block"
);
None
}
Err(e) => {
error!(
@@ -886,7 +865,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
error = ?e,
"Error during data column reconstruction"
);
None
}
}
}
@@ -975,6 +953,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// by some nodes on the network as soon as possible. Our hope is that some columns arrive from
/// other nodes in the meantime, obviating the need for us to publish them. If no other
/// publisher exists for a column, it will eventually get published here.
#[instrument(level="debug", skip_all, fields(?block_root, data_column_count=data_columns_to_publish.len()))]
fn publish_data_columns_gradually(
self: &Arc<Self>,
mut data_columns_to_publish: DataColumnSidecarList<T::EthSpec>,

View File

@@ -1009,10 +1009,6 @@ async fn import_gossip_block_acceptably_early() {
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
if num_data_columns > 0 {
rig.assert_event_journal_completes(&[WorkType::ColumnReconstruction])
.await;
}
// Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock
// and check the head in the time between the block arrived early and when its due for