Custody backfill sync (#7907)

#7603


  #### Custody backfill sync service
Similar in many ways to the current backfill service. There may be ways to unify the two services. The difficulty there is that the current backfill service tightly couples blocks and their associated blobs/data columns. Any attempts to unify the two services should be left to a separate PR in my opinion.

#### `SyncNeworkContext`
`SyncNetworkContext` manages custody sync data columns by range requests separetly from other sync RPC requests. I think this is a nice separation considering that custody backfill is its own service.

#### Data column import logic
The import logic verifies KZG committments and that the data columns block root matches the block root in the nodes store before importing columns

#### New channel to send messages to `SyncManager`
Now external services can communicate with the `SyncManager`. In this PR this channel is used to trigger a custody sync. Alternatively we may be able to use the existing `mpsc` channel that the `SyncNetworkContext` uses to communicate with the `SyncManager`. I will spend some time reviewing this.


Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>

Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>

Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Eitan Seri-Levi
2025-10-21 20:51:34 -07:00
committed by GitHub
parent 46dde9afee
commit 33e21634cb
30 changed files with 2958 additions and 200 deletions

View File

@@ -6991,6 +6991,95 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
/// Safely update data column custody info by ensuring that:
/// - cgc values at the updated epoch and the earliest custodied column epoch are equal
/// - we are only decrementing the earliest custodied data column epoch by one epoch
/// - the new earliest data column slot is set to the first slot in `effective_epoch`.
pub fn safely_backfill_data_column_custody_info(
&self,
effective_epoch: Epoch,
) -> Result<(), Error> {
let Some(earliest_data_column_epoch) = self.earliest_custodied_data_column_epoch() else {
return Ok(());
};
if effective_epoch >= earliest_data_column_epoch {
return Ok(());
}
let cgc_at_effective_epoch = self
.data_availability_checker
.custody_context()
.custody_group_count_at_epoch(effective_epoch, &self.spec);
let cgc_at_earliest_data_colum_epoch = self
.data_availability_checker
.custody_context()
.custody_group_count_at_epoch(earliest_data_column_epoch, &self.spec);
let can_update_data_column_custody_info = cgc_at_effective_epoch
== cgc_at_earliest_data_colum_epoch
&& effective_epoch == earliest_data_column_epoch - 1;
if can_update_data_column_custody_info {
self.store.put_data_column_custody_info(Some(
effective_epoch.start_slot(T::EthSpec::slots_per_epoch()),
))?;
} else {
error!(
?cgc_at_effective_epoch,
?cgc_at_earliest_data_colum_epoch,
?effective_epoch,
?earliest_data_column_epoch,
"Couldn't update data column custody info"
);
return Err(Error::FailedColumnCustodyInfoUpdate);
}
Ok(())
}
/// Compare columns custodied for `epoch` versus columns custodied for the head of the chain
/// and return any column indices that are missing.
pub fn get_missing_columns_for_epoch(&self, epoch: Epoch) -> HashSet<ColumnIndex> {
let custody_context = self.data_availability_checker.custody_context();
let columns_required = custody_context
.custody_columns_for_epoch(None, &self.spec)
.iter()
.cloned()
.collect::<HashSet<_>>();
let current_columns_at_epoch = custody_context
.custody_columns_for_epoch(Some(epoch), &self.spec)
.iter()
.cloned()
.collect::<HashSet<_>>();
columns_required
.difference(&current_columns_at_epoch)
.cloned()
.collect::<HashSet<_>>()
}
/// The da boundary for custodying columns. It will just be the DA boundary unless we are near the Fulu fork epoch.
pub fn get_column_da_boundary(&self) -> Option<Epoch> {
match self.data_availability_boundary() {
Some(da_boundary_epoch) => {
if let Some(fulu_fork_epoch) = self.spec.fulu_fork_epoch {
if da_boundary_epoch < fulu_fork_epoch {
Some(fulu_fork_epoch)
} else {
Some(da_boundary_epoch)
}
} else {
None
}
}
None => None, // If no DA boundary set, dont try to custody backfill
}
}
/// This method serves to get a sense of the current chain health. It is used in block proposal
/// to determine whether we should outsource payload production duties.
///

View File

@@ -247,6 +247,7 @@ pub enum BeaconChainError {
cache_epoch: Epoch,
},
SkipProposerPreparation,
FailedColumnCustodyInfoUpdate,
}
easy_from_to!(SlotProcessingError, BeaconChainError);

View File

@@ -0,0 +1,151 @@
use std::collections::{HashMap, HashSet};
use crate::{
BeaconChain, BeaconChainError, BeaconChainTypes,
data_column_verification::verify_kzg_for_data_column_list,
};
use store::{Error as StoreError, KeyValueStore};
use tracing::{Span, debug, instrument};
use types::{ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, Hash256, Slot};
#[derive(Debug)]
pub enum HistoricalDataColumnError {
// The provided data column sidecar pertains to a block that doesn't exist in the database.
NoBlockFound {
data_column_block_root: Hash256,
expected_block_root: Hash256,
},
/// Logic error: should never occur.
IndexOutOfBounds,
/// The provided data column sidecar list doesn't contain columns for the full range of slots for the given epoch.
MissingDataColumns {
missing_slots_and_data_columns: Vec<(Slot, ColumnIndex)>,
},
/// The provided data column sidecar list contains at least one column with an invalid kzg commitment.
InvalidKzg,
/// Internal store error
StoreError(StoreError),
/// Internal beacon chain error
BeaconChainError(Box<BeaconChainError>),
}
impl From<StoreError> for HistoricalDataColumnError {
fn from(e: StoreError) -> Self {
Self::StoreError(e)
}
}
impl<T: BeaconChainTypes> BeaconChain<T> {
/// Store a batch of historical data columns in the database.
///
/// The data columns block roots and proposer signatures are verified with the existing
/// block stored in the DB. This function also verifies the columns KZG committments.
///
/// This function requires that the data column sidecar list contains columns for a full epoch.
///
/// Return the number of `data_columns` successfully imported.
#[instrument(skip_all, fields(columns_imported_count = tracing::field::Empty ))]
pub fn import_historical_data_column_batch(
&self,
epoch: Epoch,
historical_data_column_sidecar_list: DataColumnSidecarList<T::EthSpec>,
) -> Result<usize, HistoricalDataColumnError> {
let mut total_imported = 0;
let mut ops = vec![];
let unique_column_indices = historical_data_column_sidecar_list
.iter()
.map(|item| item.index)
.collect::<HashSet<_>>();
let mut slot_and_column_index_to_data_columns = historical_data_column_sidecar_list
.iter()
.map(|data_column| ((data_column.slot(), data_column.index), data_column))
.collect::<HashMap<_, _>>();
let forward_blocks_iter = self
.forwards_iter_block_roots_until(
epoch.start_slot(T::EthSpec::slots_per_epoch()),
epoch.end_slot(T::EthSpec::slots_per_epoch()),
)
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;
for block_iter_result in forward_blocks_iter {
let (block_root, slot) = block_iter_result
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;
for column_index in unique_column_indices.clone() {
if let Some(data_column) =
slot_and_column_index_to_data_columns.remove(&(slot, column_index))
{
if self
.store
.get_data_column(&block_root, &data_column.index)?
.is_some()
{
debug!(
block_root = ?block_root,
column_index = data_column.index,
"Skipping data column import as identical data column exists"
);
continue;
}
if block_root != data_column.block_root() {
return Err(HistoricalDataColumnError::NoBlockFound {
data_column_block_root: data_column.block_root(),
expected_block_root: block_root,
});
}
self.store.data_column_as_kv_store_ops(
&block_root,
data_column.clone(),
&mut ops,
);
total_imported += 1;
}
}
}
// If we've made it to here with no columns to import, this means there are no blobs for this epoch.
// `RangeDataColumnBatchRequest` logic should have caught any bad peers withholding columns
if historical_data_column_sidecar_list.is_empty() {
if !ops.is_empty() {
// This shouldn't be a valid case. If there are no columns to import,
// there should be no generated db operations.
return Err(HistoricalDataColumnError::IndexOutOfBounds);
}
} else {
verify_kzg_for_data_column_list(historical_data_column_sidecar_list.iter(), &self.kzg)
.map_err(|_| HistoricalDataColumnError::InvalidKzg)?;
self.store.blobs_db.do_atomically(ops)?;
}
if !slot_and_column_index_to_data_columns.is_empty() {
debug!(
?epoch,
extra_data = ?slot_and_column_index_to_data_columns.keys().map(|(slot, _)| slot),
"We've received unexpected extra data columns, these will not be imported"
);
}
self.data_availability_checker
.custody_context()
.update_and_backfill_custody_count_at_epoch(epoch);
self.safely_backfill_data_column_custody_info(epoch)
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;
debug!(?epoch, total_imported, "Imported historical data columns");
let current_span = Span::current();
current_span.record("columns_imported_count", total_imported);
Ok(total_imported)
}
}

View File

@@ -28,6 +28,7 @@ pub mod fork_choice_signal;
pub mod fork_revert;
pub mod graffiti_calculator;
pub mod historical_blocks;
pub mod historical_data_columns;
pub mod kzg_utils;
pub mod light_client_finality_update_verification;
pub mod light_client_optimistic_update_verification;

View File

@@ -10,7 +10,7 @@ use types::data_column_custody_group::{CustodyIndex, compute_columns_for_custody
use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Slot};
/// A delay before making the CGC change effective to the data availability checker.
const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30;
pub const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30;
/// Number of slots after which a validator's registration is removed if it has not re-registered.
const VALIDATOR_REGISTRATION_EXPIRY_SLOTS: Slot = Slot::new(256);
@@ -30,8 +30,10 @@ struct ValidatorRegistrations {
///
/// Note: Only stores the epoch value when there's a change in custody requirement.
/// So if epoch 10 and 11 has the same custody requirement, only 10 is stored.
/// This map is never pruned, because currently we never decrease custody requirement, so this
/// map size is contained at 128.
/// This map is only pruned during custody backfill. If epoch 11 has custody requirements
/// that are then backfilled to epoch 10, the value at epoch 11 will be removed and epoch 10
/// will be added to the map instead. This should keep map size constrained to a maximum
/// value of 128.
epoch_validator_custody_requirements: BTreeMap<Epoch, u64>,
}
@@ -99,6 +101,25 @@ impl ValidatorRegistrations {
None
}
}
/// Updates the `epoch_validator_custody_requirements` map by pruning all values on/after `effective_epoch`
/// and updating the map to store the latest validator custody requirements for the `effective_epoch`.
pub fn backfill_validator_custody_requirements(&mut self, effective_epoch: Epoch) {
if let Some(latest_validator_custody) = self.latest_validator_custody_requirement() {
// Delete records if
// 1. The epoch is greater than or equal than `effective_epoch`
// 2. the cgc requirements match the latest validator custody requirements
self.epoch_validator_custody_requirements
.retain(|&epoch, custody_requirement| {
!(epoch >= effective_epoch && *custody_requirement == latest_validator_custody)
});
self.epoch_validator_custody_requirements
.entry(effective_epoch)
.and_modify(|old_custody| *old_custody = latest_validator_custody)
.or_insert(latest_validator_custody);
}
}
}
/// Given the `validator_custody_units`, return the custody requirement based on
@@ -250,6 +271,7 @@ impl<E: EthSpec> CustodyContext<E> {
);
return Some(CustodyCountChanged {
new_custody_group_count: updated_cgc,
old_custody_group_count: current_cgc,
sampling_count: self.num_of_custody_groups_to_sample(effective_epoch, spec),
effective_epoch,
});
@@ -282,7 +304,7 @@ impl<E: EthSpec> CustodyContext<E> {
/// minimum sampling size which may exceed the custody group count (CGC).
///
/// See also: [`Self::num_of_custody_groups_to_sample`].
fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 {
pub fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 {
if self.current_is_supernode {
spec.number_of_custody_groups
} else {
@@ -360,14 +382,22 @@ impl<E: EthSpec> CustodyContext<E> {
.all_custody_columns_ordered
.get()
.expect("all_custody_columns_ordered should be initialized");
&all_columns_ordered[..custody_group_count]
}
pub fn update_and_backfill_custody_count_at_epoch(&self, effective_epoch: Epoch) {
self.validator_registrations
.write()
.backfill_validator_custody_requirements(effective_epoch);
}
}
/// The custody count changed because of a change in the
/// number of validators being managed.
pub struct CustodyCountChanged {
pub new_custody_group_count: u64,
pub old_custody_group_count: u64,
pub sampling_count: u64,
pub effective_epoch: Epoch,
}