From 5d0f8a083ae3c23134e61686a7f12441c3256240 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Mon, 3 Nov 2025 00:06:06 -0800 Subject: [PATCH] Ensure custody backfill sync couples all responses before importing (#8339) Custody backfill sync has a bug when we request columns from more than one peer per batch. The fix here ensures we wait for all requests to be completed before performing verification and importing the responses. I've also added an endpoint `lighthouse/custody/backfill` that resets a nodes earliest available data column to the current epoch so that custody backfill can be triggered. This endpoint is needed to rescue any nodes that may have missing columns due to the custody backfill sync bug without requiring a full re-sync. Co-Authored-By: Eitan Seri- Levi Co-Authored-By: Eitan Seri-Levi Co-Authored-By: Jimmy Chen Co-Authored-By: Michael Sproul --- .../beacon_chain/src/custody_context.rs | 81 +++++++++++++++++-- .../src/historical_data_columns.rs | 5 -- beacon_node/http_api/src/lib.rs | 32 ++++++++ beacon_node/http_api/src/test_utils.rs | 18 +++++ .../tests/broadcast_validation_tests.rs | 2 + beacon_node/http_api/tests/fork_tests.rs | 2 + .../http_api/tests/interactive_tests.rs | 65 +++++++++++++++ .../src/sync/custody_backfill_sync/mod.rs | 5 +- .../sync/range_data_column_batch_request.rs | 21 ++--- book/src/api_lighthouse.md | 10 +++ common/eth2/src/lighthouse.rs | 13 +++ 11 files changed, 230 insertions(+), 24 deletions(-) diff --git a/beacon_node/beacon_chain/src/custody_context.rs b/beacon_node/beacon_chain/src/custody_context.rs index 9a6f51174a..a5ef3ed2f6 100644 --- a/beacon_node/beacon_chain/src/custody_context.rs +++ b/beacon_node/beacon_chain/src/custody_context.rs @@ -120,9 +120,7 @@ impl ValidatorRegistrations { let effective_epoch = (current_slot + effective_delay_slots).epoch(E::slots_per_epoch()) + 1; self.epoch_validator_custody_requirements - .entry(effective_epoch) - .and_modify(|old_custody| *old_custody = validator_custody_requirement) - .or_insert(validator_custody_requirement); + .insert(effective_epoch, validator_custody_requirement); Some((effective_epoch, validator_custody_requirement)) } else { None @@ -154,11 +152,25 @@ impl ValidatorRegistrations { }); self.epoch_validator_custody_requirements - .entry(effective_epoch) - .and_modify(|old_custody| *old_custody = latest_validator_custody) - .or_insert(latest_validator_custody); + .insert(effective_epoch, latest_validator_custody); } } + + /// Updates the `epoch -> cgc` map by pruning records before `effective_epoch` + /// while setting the `cgc` at `effective_epoch` to the latest validator custody requirement. + /// + /// This is used to restart custody backfill sync at `effective_epoch` + pub fn reset_validator_custody_requirements(&mut self, effective_epoch: Epoch) { + if let Some(latest_validator_custody_requirements) = + self.latest_validator_custody_requirement() + { + self.epoch_validator_custody_requirements + .retain(|&epoch, _| epoch >= effective_epoch); + + self.epoch_validator_custody_requirements + .insert(effective_epoch, latest_validator_custody_requirements); + }; + } } /// Given the `validator_custody_units`, return the custody requirement based on @@ -535,6 +547,14 @@ impl CustodyContext { .write() .backfill_validator_custody_requirements(effective_epoch, expected_cgc); } + + /// The node is attempting to restart custody backfill. Update the internal records so that + /// custody backfill can start backfilling at `effective_epoch`. + pub fn reset_validator_custody_requirements(&self, effective_epoch: Epoch) { + self.validator_registrations + .write() + .reset_validator_custody_requirements(effective_epoch); + } } /// Indicates that the custody group count (CGC) has increased. @@ -1491,4 +1511,53 @@ mod tests { ); } } + + #[test] + fn reset_validator_custody_requirements() { + let spec = E::default_spec(); + let minimum_cgc = 4u64; + let initial_cgc = 8u64; + let mid_cgc = 16u64; + let final_cgc = 32u64; + + // Setup: Node restart after multiple validator registrations causing CGC increases + let head_epoch = Epoch::new(20); + let epoch_and_cgc_tuples = vec![ + (Epoch::new(0), initial_cgc), + (Epoch::new(10), mid_cgc), + (head_epoch, final_cgc), + ]; + let custody_context = setup_custody_context(&spec, head_epoch, epoch_and_cgc_tuples); + + // Backfill from epoch 20 to 9 + complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(9), final_cgc); + + // Reset validator custody requirements to the latest cgc requirements at `head_epoch` up to the boundary epoch + custody_context.reset_validator_custody_requirements(head_epoch); + + // Verify epochs 0 - 19 return the minimum cgc requirement because of the validator custody requirement reset + for epoch in 0..=19 { + assert_eq!( + custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec), + minimum_cgc, + ); + } + + // Verify epoch 20 returns a CGC of 32 + assert_eq!( + custody_context.custody_group_count_at_epoch(head_epoch, &spec), + final_cgc + ); + + // Rerun Backfill to epoch 20 + complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(0), final_cgc); + + // Verify epochs 0 - 20 return the final cgc requirements + for epoch in 0..=20 { + assert_eq!( + custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec), + final_cgc, + ); + } + } } diff --git a/beacon_node/beacon_chain/src/historical_data_columns.rs b/beacon_node/beacon_chain/src/historical_data_columns.rs index 9304f06570..6cf947adcb 100644 --- a/beacon_node/beacon_chain/src/historical_data_columns.rs +++ b/beacon_node/beacon_chain/src/historical_data_columns.rs @@ -89,11 +89,6 @@ impl BeaconChain { .get_data_column(&block_root, &data_column.index)? .is_some() { - debug!( - block_root = ?block_root, - column_index = data_column.index, - "Skipping data column import as identical data column exists" - ); continue; } if block_root != data_column.block_root() { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 41cd729a68..9026792b91 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4604,6 +4604,37 @@ pub fn serve( }, ); + // POST lighthouse/custody/backfill + let post_lighthouse_custody_backfill = warp::path("lighthouse") + .and(warp::path("custody")) + .and(warp::path("backfill")) + .and(warp::path::end()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + // Calling this endpoint will trigger custody backfill once `effective_epoch`` + // is finalized. + let effective_epoch = chain + .canonical_head + .cached_head() + .head_slot() + .epoch(T::EthSpec::slots_per_epoch()) + + 1; + let custody_context = chain.data_availability_checker.custody_context(); + // Reset validator custody requirements to `effective_epoch` with the latest + // cgc requiremnets. + custody_context.reset_validator_custody_requirements(effective_epoch); + // Update `DataColumnCustodyInfo` to reflect the custody change. + chain.update_data_column_custody_info(Some( + effective_epoch.start_slot(T::EthSpec::slots_per_epoch()), + )); + Ok(()) + }) + }, + ); + // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") .and(warp::path("analysis")) @@ -4963,6 +4994,7 @@ pub fn serve( .uor(post_lighthouse_compaction) .uor(post_lighthouse_add_peer) .uor(post_lighthouse_remove_peer) + .uor(post_lighthouse_custody_backfill) .recover(warp_utils::reject::handle_rejection), ), ) diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index fe9e0dff70..27e2a27d35 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -1,6 +1,7 @@ use crate::{Config, Context}; use beacon_chain::{ BeaconChain, BeaconChainTypes, + custody_context::NodeCustodyType, test_utils::{BeaconChainHarness, BoxedMutator, Builder, EphemeralHarnessType}, }; use beacon_processor::{ @@ -67,6 +68,20 @@ impl InteractiveTester { None, Config::default(), true, + NodeCustodyType::Fullnode, + ) + .await + } + + pub async fn new_supernode(spec: Option, validator_count: usize) -> Self { + Self::new_with_initializer_and_mutator( + spec, + validator_count, + None, + None, + Config::default(), + true, + NodeCustodyType::Supernode, ) .await } @@ -78,6 +93,7 @@ impl InteractiveTester { mutator: Option>, config: Config, use_mock_builder: bool, + node_custody_type: NodeCustodyType, ) -> Self { let mut harness_builder = BeaconChainHarness::builder(E::default()) .spec_or_default(spec.map(Arc::new)) @@ -93,6 +109,8 @@ impl InteractiveTester { .fresh_ephemeral_store() }; + harness_builder = harness_builder.node_custody_type(node_custody_type); + // Add a mutator for the beacon chain builder which will be called in // `HarnessBuilder::build`. if let Some(mutator) = mutator { diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index 9427f6fdf3..82723c2b40 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -1,3 +1,4 @@ +use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::test_utils::test_spec; use beacon_chain::{ GossipVerifiedBlock, IntoGossipVerifiedBlock, WhenSlotSkipped, @@ -1956,6 +1957,7 @@ pub async fn duplicate_block_status_code() { ..Config::default() }, true, + NodeCustodyType::Fullnode, ) .await; diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 62a3461276..50cf866b6a 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -1,4 +1,5 @@ //! Tests for API behaviour across fork boundaries. +use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::{ StateSkipConfig, test_utils::{DEFAULT_ETH1_BLOCK_HASH, HARNESS_GENESIS_TIME, RelativeSyncCommittee}, @@ -426,6 +427,7 @@ async fn bls_to_execution_changes_update_all_around_capella_fork() { None, Default::default(), true, + NodeCustodyType::Fullnode, ) .await; let harness = &tester.harness; diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index a9de737d65..83cb70a7a3 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -1,4 +1,5 @@ //! Generic tests that make use of the (newer) `InteractiveApiTester` +use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::{ ChainConfig, chain_config::{DisallowedReOrgOffsets, ReOrgThreshold}, @@ -76,6 +77,7 @@ async fn state_by_root_pruned_from_fork_choice() { None, Default::default(), false, + NodeCustodyType::Fullnode, ) .await; @@ -433,6 +435,7 @@ pub async fn proposer_boost_re_org_test( })), Default::default(), false, + NodeCustodyType::Fullnode, ) .await; let harness = &tester.harness; @@ -1049,6 +1052,68 @@ async fn proposer_duties_with_gossip_tolerance() { ); } +// Test that a request to `lighthouse/custody/backfill` succeeds by verifying that `CustodyContext` and `DataColumnCustodyInfo` +// have been updated with the correct values. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn lighthouse_restart_custody_backfill() { + let spec = test_spec::(); + + // Skip pre-Fulu. + if !spec.is_fulu_scheduled() { + return; + } + + let validator_count = 24; + + let tester = InteractiveTester::::new_supernode(Some(spec), validator_count).await; + let harness = &tester.harness; + let spec = &harness.spec; + let client = &tester.client; + let min_cgc = spec.custody_requirement; + let max_cgc = spec.number_of_custody_groups; + + let num_blocks = 2 * E::slots_per_epoch(); + + let custody_context = harness.chain.data_availability_checker.custody_context(); + + harness.advance_slot(); + harness + .extend_chain_with_sync( + num_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + SyncCommitteeStrategy::NoValidators, + LightClientStrategy::Disabled, + ) + .await; + + let cgc_at_head = custody_context.custody_group_count_at_head(spec); + let earliest_data_column_epoch = harness.chain.earliest_custodied_data_column_epoch(); + + assert_eq!(cgc_at_head, max_cgc); + assert_eq!(earliest_data_column_epoch, None); + + custody_context + .update_and_backfill_custody_count_at_epoch(harness.chain.epoch().unwrap(), cgc_at_head); + client.post_lighthouse_custody_backfill().await.unwrap(); + + let cgc_at_head = custody_context.custody_group_count_at_head(spec); + let cgc_at_previous_epoch = + custody_context.custody_group_count_at_epoch(harness.chain.epoch().unwrap() - 1, spec); + let earliest_data_column_epoch = harness.chain.earliest_custodied_data_column_epoch(); + + // `DataColumnCustodyInfo` should have been updated to the head epoch + assert_eq!( + earliest_data_column_epoch, + Some(harness.chain.epoch().unwrap() + 1) + ); + // Cgc requirements should have stayed the same at head + assert_eq!(cgc_at_head, max_cgc); + // Cgc requirements at the previous epoch should be `min_cgc` + // This allows for custody backfill to re-fetch columns for this epoch. + assert_eq!(cgc_at_previous_epoch, min_cgc); +} + // Test that a request for next epoch proposer duties suceeds when the current slot clock is within // gossip clock disparity (500ms) of the new epoch. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs index 5c5505083f..bb2c6799f1 100644 --- a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs @@ -382,11 +382,9 @@ impl CustodyBackFillSync { return None; }; - let mut missing_columns = HashSet::new(); - // Skip all batches (Epochs) that don't have missing columns. for epoch in Epoch::range_inclusive_rev(self.to_be_downloaded, column_da_boundary) { - missing_columns = self.beacon_chain.get_missing_columns_for_epoch(epoch); + let missing_columns = self.beacon_chain.get_missing_columns_for_epoch(epoch); if !missing_columns.is_empty() { self.to_be_downloaded = epoch; @@ -445,6 +443,7 @@ impl CustodyBackFillSync { self.include_next_batch() } Entry::Vacant(entry) => { + let missing_columns = self.beacon_chain.get_missing_columns_for_epoch(batch_id); entry.insert(BatchInfo::new( &batch_id, CUSTODY_BACKFILL_EPOCHS_PER_BATCH, diff --git a/beacon_node/network/src/sync/range_data_column_batch_request.rs b/beacon_node/network/src/sync/range_data_column_batch_request.rs index 542d99d97c..72e2fb2d5b 100644 --- a/beacon_node/network/src/sync/range_data_column_batch_request.rs +++ b/beacon_node/network/src/sync/range_data_column_batch_request.rs @@ -70,16 +70,17 @@ impl RangeDataColumnBatchRequest { HashMap::new(); let mut column_to_peer_id: HashMap = HashMap::new(); - for column in self - .requests - .values() - .filter_map(|req| req.to_finished()) - .flatten() - { - received_columns_for_slot - .entry(column.slot()) - .or_default() - .push(column.clone()); + for req in self.requests.values() { + let Some(columns) = req.to_finished() else { + return None; + }; + + for column in columns { + received_columns_for_slot + .entry(column.slot()) + .or_default() + .push(column.clone()); + } } // Note: this assumes that only 1 peer is responsible for a column diff --git a/book/src/api_lighthouse.md b/book/src/api_lighthouse.md index 2e694989f9..f804cb9df2 100644 --- a/book/src/api_lighthouse.md +++ b/book/src/api_lighthouse.md @@ -447,6 +447,16 @@ indicating that all states with slots `>= 0` are available, i.e., full state his on the specific meanings of these fields see the docs on [Checkpoint Sync](./advanced_checkpoint_sync.md#how-to-run-an-archived-node). +## `/lighthouse/custody/backfill` + +Starts a custody backfill sync from the next epoch with the node's latest custody requirements. The sync won't begin immediately, it waits until the next epoch is finalized before triggering. + +This endpoint should only be used to fix nodes that may have partial custody columns due to a prior backfill bug (present in v8.0.0-rc.2). Use with caution as it re-downloads all historic custody data columns and may consume significant bandwidth. + +```bash +curl -X POST "http://localhost:5052/lighthouse/custody/backfill" +``` + ## `/lighthouse/merge_readiness` Returns the current difficulty and terminal total difficulty of the network. Before [The Merge](https://ethereum.org/en/roadmap/merge/) on 15th September 2022, you will see that the current difficulty is less than the terminal total difficulty, An example is shown below: diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index f65b5a07b6..4f9a049e44 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -208,6 +208,19 @@ impl BeaconNodeHttpClient { self.get(path).await } + /// `POST lighthouse/custody/backfill` + pub async fn post_lighthouse_custody_backfill(&self) -> Result<(), Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("custody") + .push("backfill"); + + self.post(path, &()).await + } + /* * Note: *