From 8debf97d3dfa082f3c705ec3c9a0414d8581916f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 9 Feb 2026 21:08:50 +0000 Subject: [PATCH] Add data column timestamp tracking - Add seen_timestamp field to KzgVerifiedDataColumn and KzgVerifiedCustodyDataColumn - Update all creation points to capture and pass timestamps - Add all_data_columns_observed field to Timestamps and BlockDelays - Add set_time_data_column_observed method to BlockTimesCache - Update beacon_chain.rs to record data column timestamps - Update overflow_lru_cache to compute data column timestamps - Replace blob_delay_ms with data_column_delay_ms in logs and metrics Co-authored-by: michaelsproul <4452260+michaelsproul@users.noreply.github.com> --- beacon_node/beacon_chain/src/beacon_chain.rs | 25 +++++-- .../beacon_chain/src/block_times_cache.rs | 40 +++++++++- .../beacon_chain/src/canonical_head.rs | 10 +-- .../src/data_availability_checker.rs | 14 +++- .../overflow_lru_cache.rs | 7 +- .../src/data_column_verification.rs | 75 ++++++++++++++++--- .../beacon_chain/src/fetch_blobs/mod.rs | 6 +- 7 files changed, 144 insertions(+), 33 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ec79153785..5754668bc8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3777,17 +3777,26 @@ impl BeaconChain { consensus_context, } = import_data; - // Record the time at which this block's blobs became available. + // Record the time at which this block's blobs or data columns became available. if let Some(blobs_available) = block.blobs_available_timestamp() { - self.block_times_cache.write().set_time_blob_observed( - block_root, - block.slot(), - blobs_available, - ); + // Check if we're using data columns (post-Fulu) by looking at the block data + match block.data() { + AvailableBlockData::DataColumns(_) => { + self.block_times_cache + .write() + .set_time_data_column_observed(block_root, block.slot(), blobs_available); + } + AvailableBlockData::Blobs(_) => { + self.block_times_cache + .write() + .set_time_blob_observed(block_root, block.slot(), blobs_available); + } + AvailableBlockData::NoData => { + // No data to record + } + } } - // TODO(das) record custody column available timestamp - let block_root = { // Capture the current span before moving into the blocking task let current_span = tracing::Span::current(); diff --git a/beacon_node/beacon_chain/src/block_times_cache.rs b/beacon_node/beacon_chain/src/block_times_cache.rs index e8d4c75dce..03dc5ae5d2 100644 --- a/beacon_node/beacon_chain/src/block_times_cache.rs +++ b/beacon_node/beacon_chain/src/block_times_cache.rs @@ -19,6 +19,7 @@ type BlockRoot = Hash256; pub struct Timestamps { pub observed: Option, pub all_blobs_observed: Option, + pub all_data_columns_observed: Option, pub consensus_verified: Option, pub started_execution: Option, pub executed: Option, @@ -34,13 +35,15 @@ pub struct BlockDelays { pub observed: Option, /// The time after the start of the slot we saw all blobs. pub all_blobs_observed: Option, + /// The time after the start of the slot we saw all data columns. + pub all_data_columns_observed: Option, /// The time it took to complete consensus verification of the block. pub consensus_verification_time: Option, /// The time it took to complete execution verification of the block. pub execution_time: Option, /// The delay from the start of the slot before the block became available /// - /// Equal to max(`observed + execution_time`, `all_blobs_observed`). + /// Equal to max(`observed + execution_time`, `all_blobs_observed` or `all_data_columns_observed`). pub available: Option, /// Time after `available`. pub attestable: Option, @@ -62,6 +65,11 @@ impl BlockDelays { let all_blobs_observed = times .all_blobs_observed .and_then(|all_blobs_observed| all_blobs_observed.checked_sub(slot_start_time)); + let all_data_columns_observed = times + .all_data_columns_observed + .and_then(|all_data_columns_observed| { + all_data_columns_observed.checked_sub(slot_start_time) + }); let consensus_verification_time = times .consensus_verified .and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?)); @@ -69,9 +77,14 @@ impl BlockDelays { .executed .and_then(|executed| executed.checked_sub(times.started_execution?)); // Duration since UNIX epoch at which block became available. + // Use data columns if present, otherwise fall back to blobs + let availability_timestamp = times + .all_data_columns_observed + .or(times.all_blobs_observed) + .unwrap_or_default(); let available_time = times .executed - .map(|executed| std::cmp::max(executed, times.all_blobs_observed.unwrap_or_default())); + .map(|executed| std::cmp::max(executed, availability_timestamp)); // Duration from the start of the slot until the block became available. let available_delay = available_time.and_then(|available_time| available_time.checked_sub(slot_start_time)); @@ -87,6 +100,7 @@ impl BlockDelays { BlockDelays { observed, all_blobs_observed, + all_data_columns_observed, consensus_verification_time, execution_time, available: available_delay, @@ -179,6 +193,28 @@ impl BlockTimesCache { } } + pub fn set_time_data_column_observed( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + // Unlike other functions in this file, we update the data column observed time only if it + // is *greater* than existing data column observation times. This allows us to know the + // observation time of the last data column to arrive. + let block_times = self + .cache + .entry(block_root) + .or_insert_with(|| BlockTimesCacheValue::new(slot)); + if block_times + .timestamps + .all_data_columns_observed + .is_none_or(|prev| timestamp > prev) + { + block_times.timestamps.all_data_columns_observed = Some(timestamp); + } + } + /// Set the timestamp for `field` if that timestamp is less than any previously known value. /// /// If no previous value is known for the field, then the supplied timestamp will always be diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 1a08ac3f88..4f6caaedfe 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -1359,12 +1359,12 @@ fn observe_head_block_delays( .as_millis() as i64, ); - // The time from the start of the slot when all blobs have been observed. Technically this - // is the time we last saw a blob related to this block/slot. + // The time from the start of the slot when all data columns have been observed. Technically + // this is the time we last saw a data column related to this block/slot. metrics::set_gauge( &metrics::BEACON_BLOB_DELAY_ALL_OBSERVED_SLOT_START, block_delays - .all_blobs_observed + .all_data_columns_observed .unwrap_or_else(|| Duration::from_secs(0)) .as_millis() as i64, ); @@ -1445,7 +1445,7 @@ fn observe_head_block_delays( slot = %head_block_slot, total_delay_ms = block_delay_total.as_millis(), observed_delay_ms = format_delay(&block_delays.observed), - blob_delay_ms = format_delay(&block_delays.all_blobs_observed), + data_column_delay_ms = format_delay(&block_delays.all_data_columns_observed), consensus_time_ms = format_delay(&block_delays.consensus_verification_time), execution_time_ms = format_delay(&block_delays.execution_time), available_delay_ms = format_delay(&block_delays.available), @@ -1479,7 +1479,7 @@ fn observe_head_block_delays( slot = %head_block_slot, total_delay_ms = block_delay_total.as_millis(), observed_delay_ms = format_delay(&block_delays.observed), - blob_delay_ms = format_delay(&block_delays.all_blobs_observed), + data_column_delay_ms = format_delay(&block_delays.all_data_columns_observed), consensus_time_ms = format_delay(&block_delays.consensus_verification_time), execution_time_ms = format_delay(&block_delays.execution_time), available_delay_ms = format_delay(&block_delays.available), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index e266e02f7f..5a4050ac09 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -240,10 +240,18 @@ impl DataAvailabilityChecker { slot: Slot, custody_columns: DataColumnSidecarList, ) -> Result, AvailabilityCheckError> { + let seen_timestamp = self + .slot_clock + .now_duration() + .unwrap_or(Duration::from_secs(0)); + // Attributes fault to the specific peer that sent an invalid column - let kzg_verified_columns = - KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg) - .map_err(AvailabilityCheckError::InvalidColumn)?; + let kzg_verified_columns = KzgVerifiedDataColumn::from_batch_with_scoring( + custody_columns, + &self.kzg, + seen_timestamp, + ) + .map_err(AvailabilityCheckError::InvalidColumn)?; // Filter out columns that aren't required for custody for this slot // This is required because `data_columns_by_root` requests the **latest** CGC that _may_ diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index f7bd646f82..c4a05551f8 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -282,8 +282,11 @@ impl PendingComponents { .flatten() .map(|blob| blob.seen_timestamp()) .max(), - // TODO(das): To be fixed with https://github.com/sigp/lighthouse/pull/6850 - AvailableBlockData::DataColumns(_) => None, + AvailableBlockData::DataColumns(_) => self + .verified_data_columns + .iter() + .map(|col| col.seen_timestamp()) + .max(), }; let AvailabilityPendingExecutedBlock { diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index cf3385ec5b..1ed3726500 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -16,6 +16,7 @@ use ssz_types::VariableList; use std::iter; use std::marker::PhantomData; use std::sync::Arc; +use std::time::Duration; use tracing::{debug, instrument}; use types::data::ColumnIndex; use types::{ @@ -254,6 +255,8 @@ impl GossipVerifiedDataColumn ) -> Result { verify_data_column_sidecar(&column_sidecar, &chain.spec)?; + let seen_timestamp = chain.slot_clock.now_duration().unwrap_or_default(); + // Check if the data column is already in the DA checker cache. This happens when data columns // are made available through the `engine_getBlobs` method. If it exists in the cache, we know // it has already passed the gossip checks, even though this particular instance hasn't been @@ -274,7 +277,10 @@ impl GossipVerifiedDataColumn Ok(Self { block_root: column_sidecar.block_root(), - data_column: KzgVerifiedDataColumn::from_execution_verified(column_sidecar), + data_column: KzgVerifiedDataColumn::from_execution_verified( + column_sidecar, + seen_timestamp, + ), _phantom: Default::default(), }) } @@ -320,35 +326,51 @@ impl GossipVerifiedDataColumn #[ssz(struct_behaviour = "transparent")] pub struct KzgVerifiedDataColumn { data: Arc>, + #[ssz(skip_serializing, skip_deserializing)] + seen_timestamp: Duration, } impl KzgVerifiedDataColumn { pub fn new( data_column: Arc>, kzg: &Kzg, + seen_timestamp: Duration, ) -> Result, KzgError)> { - verify_kzg_for_data_column(data_column, kzg) + verify_kzg_for_data_column(data_column, kzg, seen_timestamp) } /// Mark a data column as KZG verified. Caller must ONLY use this on columns constructed /// from EL blobs. - pub fn from_execution_verified(data_column: Arc>) -> Self { - Self { data: data_column } + pub fn from_execution_verified( + data_column: Arc>, + seen_timestamp: Duration, + ) -> Self { + Self { + data: data_column, + seen_timestamp, + } } /// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. pub(crate) fn __new_for_testing(data_column: Arc>) -> Self { - Self { data: data_column } + Self { + data: data_column, + seen_timestamp: Duration::from_secs(0), + } } pub fn from_batch_with_scoring( data_columns: Vec>>, kzg: &Kzg, + seen_timestamp: Duration, ) -> Result, (Option, KzgError)> { verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; Ok(data_columns .into_iter() - .map(|column| Self { data: column }) + .map(|column| Self { + data: column, + seen_timestamp, + }) .collect()) } @@ -366,6 +388,10 @@ impl KzgVerifiedDataColumn { pub fn index(&self) -> ColumnIndex { *self.data.index() } + + pub fn seen_timestamp(&self) -> Duration { + self.seen_timestamp + } } pub type CustodyDataColumnList = @@ -407,14 +433,18 @@ impl CustodyDataColumn { #[ssz(struct_behaviour = "transparent")] pub struct KzgVerifiedCustodyDataColumn { data: Arc>, + #[ssz(skip_serializing, skip_deserializing)] + seen_timestamp: Duration, } impl KzgVerifiedCustodyDataColumn { /// Mark a column as custody column. Caller must ensure that our current custody requirements /// include this column pub fn from_asserted_custody(kzg_verified: KzgVerifiedDataColumn) -> Self { + let seen_timestamp = kzg_verified.seen_timestamp(); Self { data: kzg_verified.to_data_column(), + seen_timestamp, } } @@ -422,10 +452,12 @@ impl KzgVerifiedCustodyDataColumn { pub fn new( data_column: CustodyDataColumn, kzg: &Kzg, + seen_timestamp: Duration, ) -> Result, KzgError)> { - verify_kzg_for_data_column(data_column.clone_arc(), kzg)?; + verify_kzg_for_data_column(data_column.clone_arc(), kzg, seen_timestamp)?; Ok(Self { data: data_column.data, + seen_timestamp, }) } @@ -443,10 +475,18 @@ impl KzgVerifiedCustodyDataColumn { spec, )?; + // Use the maximum timestamp from the partial set for reconstructed columns + let seen_timestamp = partial_set_of_columns + .iter() + .map(|col| col.seen_timestamp) + .max() + .unwrap_or_default(); + Ok(all_data_columns .into_iter() - .map(|data| { - KzgVerifiedCustodyDataColumn::from_asserted_custody(KzgVerifiedDataColumn { data }) + .map(|data| KzgVerifiedCustodyDataColumn { + data, + seen_timestamp, }) .collect::>()) } @@ -464,6 +504,10 @@ impl KzgVerifiedCustodyDataColumn { pub fn index(&self) -> ColumnIndex { *self.data.index() } + + pub fn seen_timestamp(&self) -> Duration { + self.seen_timestamp + } } /// Complete kzg verification for a `DataColumnSidecar`. @@ -473,10 +517,14 @@ impl KzgVerifiedCustodyDataColumn { pub fn verify_kzg_for_data_column( data_column: Arc>, kzg: &Kzg, + seen_timestamp: Duration, ) -> Result, (Option, KzgError)> { let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES); validate_data_columns(kzg, iter::once(&data_column))?; - Ok(KzgVerifiedDataColumn { data: data_column }) + Ok(KzgVerifiedDataColumn { + data: data_column, + seen_timestamp, + }) } /// Complete kzg verification for a list of `DataColumnSidecar`s. @@ -518,6 +566,8 @@ pub fn validate_data_column_sidecar_for_gossip_fulu( // This filtering ensures we only import and publish the custody columns. // `DataAvailabilityChecker` requires a strict match on custody columns count to // consider a block available. + let seen_timestamp = timestamp_now(); let mut custody_columns = data_columns_result .map(|data_columns| { data_columns @@ -384,7 +385,10 @@ async fn compute_custody_columns_to_import( .filter(|col| custody_columns_indices.contains(col.index())) .map(|col| { KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::from_execution_verified(col), + KzgVerifiedDataColumn::from_execution_verified( + col, + seen_timestamp, + ), ) }) .collect::>()