From 33e21634cb8842f396e4537342b4c84dc5095649 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 21 Oct 2025 20:51:34 -0700 Subject: [PATCH] Custody backfill sync (#7907) #7603 #### Custody backfill sync service Similar in many ways to the current backfill service. There may be ways to unify the two services. The difficulty there is that the current backfill service tightly couples blocks and their associated blobs/data columns. Any attempts to unify the two services should be left to a separate PR in my opinion. #### `SyncNeworkContext` `SyncNetworkContext` manages custody sync data columns by range requests separetly from other sync RPC requests. I think this is a nice separation considering that custody backfill is its own service. #### Data column import logic The import logic verifies KZG committments and that the data columns block root matches the block root in the nodes store before importing columns #### New channel to send messages to `SyncManager` Now external services can communicate with the `SyncManager`. In this PR this channel is used to trigger a custody sync. Alternatively we may be able to use the existing `mpsc` channel that the `SyncNetworkContext` uses to communicate with the `SyncManager`. I will spend some time reviewing this. Co-Authored-By: Eitan Seri-Levi Co-Authored-By: Eitan Seri- Levi Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/beacon_chain/src/beacon_chain.rs | 89 ++ beacon_node/beacon_chain/src/errors.rs | 1 + .../src/historical_data_columns.rs | 151 +++ beacon_node/beacon_chain/src/lib.rs | 1 + .../beacon_chain/src/validator_custody.rs | 38 +- beacon_node/beacon_chain/tests/store_tests.rs | 401 ++++++ beacon_node/client/src/notifier.rs | 95 ++ beacon_node/http_api/src/lib.rs | 4 +- .../src/service/api_types.rs | 55 +- .../lighthouse_network/src/types/globals.rs | 6 + .../lighthouse_network/src/types/mod.rs | 2 +- beacon_node/lighthouse_tracing/src/lib.rs | 4 + beacon_node/network/src/metrics.rs | 16 + .../src/network_beacon_processor/mod.rs | 17 + .../network_beacon_processor/sync_methods.rs | 107 +- .../network/src/sync/backfill_sync/mod.rs | 40 +- .../src/sync/{range_sync => }/batch.rs | 209 +-- .../src/sync/block_sidecar_coupling.rs | 65 +- .../src/sync/custody_backfill_sync/mod.rs | 1126 +++++++++++++++++ beacon_node/network/src/sync/manager.rs | 175 ++- beacon_node/network/src/sync/mod.rs | 5 +- .../network/src/sync/network_context.rs | 126 +- .../sync/range_data_column_batch_request.rs | 297 +++++ .../network/src/sync/range_sync/chain.rs | 52 +- .../network/src/sync/range_sync/mod.rs | 8 +- .../network/src/sync/range_sync/range.rs | 3 +- beacon_node/store/src/hot_cold_store.rs | 13 + common/eth2/src/lighthouse/sync_state.rs | 35 +- consensus/types/src/slot_epoch.rs | 7 + scripts/tests/checkpoint-sync.sh | 10 +- 30 files changed, 2958 insertions(+), 200 deletions(-) create mode 100644 beacon_node/beacon_chain/src/historical_data_columns.rs rename beacon_node/network/src/sync/{range_sync => }/batch.rs (82%) create mode 100644 beacon_node/network/src/sync/custody_backfill_sync/mod.rs create mode 100644 beacon_node/network/src/sync/range_data_column_batch_request.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e299bea2da..152de1a20b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6991,6 +6991,95 @@ impl BeaconChain { } } + /// Safely update data column custody info by ensuring that: + /// - cgc values at the updated epoch and the earliest custodied column epoch are equal + /// - we are only decrementing the earliest custodied data column epoch by one epoch + /// - the new earliest data column slot is set to the first slot in `effective_epoch`. + pub fn safely_backfill_data_column_custody_info( + &self, + effective_epoch: Epoch, + ) -> Result<(), Error> { + let Some(earliest_data_column_epoch) = self.earliest_custodied_data_column_epoch() else { + return Ok(()); + }; + + if effective_epoch >= earliest_data_column_epoch { + return Ok(()); + } + + let cgc_at_effective_epoch = self + .data_availability_checker + .custody_context() + .custody_group_count_at_epoch(effective_epoch, &self.spec); + + let cgc_at_earliest_data_colum_epoch = self + .data_availability_checker + .custody_context() + .custody_group_count_at_epoch(earliest_data_column_epoch, &self.spec); + + let can_update_data_column_custody_info = cgc_at_effective_epoch + == cgc_at_earliest_data_colum_epoch + && effective_epoch == earliest_data_column_epoch - 1; + + if can_update_data_column_custody_info { + self.store.put_data_column_custody_info(Some( + effective_epoch.start_slot(T::EthSpec::slots_per_epoch()), + ))?; + } else { + error!( + ?cgc_at_effective_epoch, + ?cgc_at_earliest_data_colum_epoch, + ?effective_epoch, + ?earliest_data_column_epoch, + "Couldn't update data column custody info" + ); + return Err(Error::FailedColumnCustodyInfoUpdate); + } + + Ok(()) + } + + /// Compare columns custodied for `epoch` versus columns custodied for the head of the chain + /// and return any column indices that are missing. + pub fn get_missing_columns_for_epoch(&self, epoch: Epoch) -> HashSet { + let custody_context = self.data_availability_checker.custody_context(); + + let columns_required = custody_context + .custody_columns_for_epoch(None, &self.spec) + .iter() + .cloned() + .collect::>(); + + let current_columns_at_epoch = custody_context + .custody_columns_for_epoch(Some(epoch), &self.spec) + .iter() + .cloned() + .collect::>(); + + columns_required + .difference(¤t_columns_at_epoch) + .cloned() + .collect::>() + } + + /// The da boundary for custodying columns. It will just be the DA boundary unless we are near the Fulu fork epoch. + pub fn get_column_da_boundary(&self) -> Option { + match self.data_availability_boundary() { + Some(da_boundary_epoch) => { + if let Some(fulu_fork_epoch) = self.spec.fulu_fork_epoch { + if da_boundary_epoch < fulu_fork_epoch { + Some(fulu_fork_epoch) + } else { + Some(da_boundary_epoch) + } + } else { + None + } + } + None => None, // If no DA boundary set, dont try to custody backfill + } + } + /// This method serves to get a sense of the current chain health. It is used in block proposal /// to determine whether we should outsource payload production duties. /// diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 7b04a36fae..d4eba2b0ea 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -247,6 +247,7 @@ pub enum BeaconChainError { cache_epoch: Epoch, }, SkipProposerPreparation, + FailedColumnCustodyInfoUpdate, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/historical_data_columns.rs b/beacon_node/beacon_chain/src/historical_data_columns.rs new file mode 100644 index 0000000000..7e196eb75e --- /dev/null +++ b/beacon_node/beacon_chain/src/historical_data_columns.rs @@ -0,0 +1,151 @@ +use std::collections::{HashMap, HashSet}; + +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, + data_column_verification::verify_kzg_for_data_column_list, +}; +use store::{Error as StoreError, KeyValueStore}; +use tracing::{Span, debug, instrument}; +use types::{ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, Hash256, Slot}; + +#[derive(Debug)] +pub enum HistoricalDataColumnError { + // The provided data column sidecar pertains to a block that doesn't exist in the database. + NoBlockFound { + data_column_block_root: Hash256, + expected_block_root: Hash256, + }, + + /// Logic error: should never occur. + IndexOutOfBounds, + + /// The provided data column sidecar list doesn't contain columns for the full range of slots for the given epoch. + MissingDataColumns { + missing_slots_and_data_columns: Vec<(Slot, ColumnIndex)>, + }, + + /// The provided data column sidecar list contains at least one column with an invalid kzg commitment. + InvalidKzg, + + /// Internal store error + StoreError(StoreError), + + /// Internal beacon chain error + BeaconChainError(Box), +} + +impl From for HistoricalDataColumnError { + fn from(e: StoreError) -> Self { + Self::StoreError(e) + } +} + +impl BeaconChain { + /// Store a batch of historical data columns in the database. + /// + /// The data columns block roots and proposer signatures are verified with the existing + /// block stored in the DB. This function also verifies the columns KZG committments. + /// + /// This function requires that the data column sidecar list contains columns for a full epoch. + /// + /// Return the number of `data_columns` successfully imported. + #[instrument(skip_all, fields(columns_imported_count = tracing::field::Empty ))] + pub fn import_historical_data_column_batch( + &self, + epoch: Epoch, + historical_data_column_sidecar_list: DataColumnSidecarList, + ) -> Result { + let mut total_imported = 0; + let mut ops = vec![]; + + let unique_column_indices = historical_data_column_sidecar_list + .iter() + .map(|item| item.index) + .collect::>(); + + let mut slot_and_column_index_to_data_columns = historical_data_column_sidecar_list + .iter() + .map(|data_column| ((data_column.slot(), data_column.index), data_column)) + .collect::>(); + + let forward_blocks_iter = self + .forwards_iter_block_roots_until( + epoch.start_slot(T::EthSpec::slots_per_epoch()), + epoch.end_slot(T::EthSpec::slots_per_epoch()), + ) + .map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?; + + for block_iter_result in forward_blocks_iter { + let (block_root, slot) = block_iter_result + .map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?; + + for column_index in unique_column_indices.clone() { + if let Some(data_column) = + slot_and_column_index_to_data_columns.remove(&(slot, column_index)) + { + if self + .store + .get_data_column(&block_root, &data_column.index)? + .is_some() + { + debug!( + block_root = ?block_root, + column_index = data_column.index, + "Skipping data column import as identical data column exists" + ); + continue; + } + if block_root != data_column.block_root() { + return Err(HistoricalDataColumnError::NoBlockFound { + data_column_block_root: data_column.block_root(), + expected_block_root: block_root, + }); + } + self.store.data_column_as_kv_store_ops( + &block_root, + data_column.clone(), + &mut ops, + ); + total_imported += 1; + } + } + } + + // If we've made it to here with no columns to import, this means there are no blobs for this epoch. + // `RangeDataColumnBatchRequest` logic should have caught any bad peers withholding columns + if historical_data_column_sidecar_list.is_empty() { + if !ops.is_empty() { + // This shouldn't be a valid case. If there are no columns to import, + // there should be no generated db operations. + return Err(HistoricalDataColumnError::IndexOutOfBounds); + } + } else { + verify_kzg_for_data_column_list(historical_data_column_sidecar_list.iter(), &self.kzg) + .map_err(|_| HistoricalDataColumnError::InvalidKzg)?; + + self.store.blobs_db.do_atomically(ops)?; + } + + if !slot_and_column_index_to_data_columns.is_empty() { + debug!( + ?epoch, + extra_data = ?slot_and_column_index_to_data_columns.keys().map(|(slot, _)| slot), + "We've received unexpected extra data columns, these will not be imported" + ); + } + + self.data_availability_checker + .custody_context() + .update_and_backfill_custody_count_at_epoch(epoch); + + self.safely_backfill_data_column_custody_info(epoch) + .map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?; + + debug!(?epoch, total_imported, "Imported historical data columns"); + + let current_span = Span::current(); + current_span.record("columns_imported_count", total_imported); + + Ok(total_imported) + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 9d8c3dba38..fd2162e7d3 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -28,6 +28,7 @@ pub mod fork_choice_signal; pub mod fork_revert; pub mod graffiti_calculator; pub mod historical_blocks; +pub mod historical_data_columns; pub mod kzg_utils; pub mod light_client_finality_update_verification; pub mod light_client_optimistic_update_verification; diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 3ab76828c9..ea1dfdaae0 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -10,7 +10,7 @@ use types::data_column_custody_group::{CustodyIndex, compute_columns_for_custody use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Slot}; /// A delay before making the CGC change effective to the data availability checker. -const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; +pub const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; /// Number of slots after which a validator's registration is removed if it has not re-registered. const VALIDATOR_REGISTRATION_EXPIRY_SLOTS: Slot = Slot::new(256); @@ -30,8 +30,10 @@ struct ValidatorRegistrations { /// /// Note: Only stores the epoch value when there's a change in custody requirement. /// So if epoch 10 and 11 has the same custody requirement, only 10 is stored. - /// This map is never pruned, because currently we never decrease custody requirement, so this - /// map size is contained at 128. + /// This map is only pruned during custody backfill. If epoch 11 has custody requirements + /// that are then backfilled to epoch 10, the value at epoch 11 will be removed and epoch 10 + /// will be added to the map instead. This should keep map size constrained to a maximum + /// value of 128. epoch_validator_custody_requirements: BTreeMap, } @@ -99,6 +101,25 @@ impl ValidatorRegistrations { None } } + + /// Updates the `epoch_validator_custody_requirements` map by pruning all values on/after `effective_epoch` + /// and updating the map to store the latest validator custody requirements for the `effective_epoch`. + pub fn backfill_validator_custody_requirements(&mut self, effective_epoch: Epoch) { + if let Some(latest_validator_custody) = self.latest_validator_custody_requirement() { + // Delete records if + // 1. The epoch is greater than or equal than `effective_epoch` + // 2. the cgc requirements match the latest validator custody requirements + self.epoch_validator_custody_requirements + .retain(|&epoch, custody_requirement| { + !(epoch >= effective_epoch && *custody_requirement == latest_validator_custody) + }); + + self.epoch_validator_custody_requirements + .entry(effective_epoch) + .and_modify(|old_custody| *old_custody = latest_validator_custody) + .or_insert(latest_validator_custody); + } + } } /// Given the `validator_custody_units`, return the custody requirement based on @@ -250,6 +271,7 @@ impl CustodyContext { ); return Some(CustodyCountChanged { new_custody_group_count: updated_cgc, + old_custody_group_count: current_cgc, sampling_count: self.num_of_custody_groups_to_sample(effective_epoch, spec), effective_epoch, }); @@ -282,7 +304,7 @@ impl CustodyContext { /// minimum sampling size which may exceed the custody group count (CGC). /// /// See also: [`Self::num_of_custody_groups_to_sample`]. - fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 { + pub fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 { if self.current_is_supernode { spec.number_of_custody_groups } else { @@ -360,14 +382,22 @@ impl CustodyContext { .all_custody_columns_ordered .get() .expect("all_custody_columns_ordered should be initialized"); + &all_columns_ordered[..custody_group_count] } + + pub fn update_and_backfill_custody_count_at_epoch(&self, effective_epoch: Epoch) { + self.validator_registrations + .write() + .backfill_validator_custody_requirements(effective_epoch); + } } /// The custody count changed because of a change in the /// number of validators being managed. pub struct CustodyCountChanged { pub new_custody_group_count: u64, + pub old_custody_group_count: u64, pub sampling_count: u64, pub effective_epoch: Epoch, } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ec5c1c90db..69d16b3071 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -4,12 +4,14 @@ use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::builder::BeaconChainBuilder; use beacon_chain::data_availability_checker::AvailableBlock; +use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::schema_change::migrate_schema; use beacon_chain::test_utils::SyncCommitteeStrategy; use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, get_kzg, mock_execution_layer_from_parts, test_spec, }; +use beacon_chain::validator_custody::CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS; use beacon_chain::{ BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, BlockError, ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, @@ -3169,6 +3171,245 @@ async fn weak_subjectivity_sync_test( assert_eq!(store.get_anchor_info().state_upper_limit, Slot::new(0)); } +#[tokio::test] +async fn test_import_historical_data_columns_batch() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Epoch::new(0).start_slot(E::slots_per_epoch()) + 1; + let end_slot = Epoch::new(0).end_slot(E::slots_per_epoch()); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + for data_column in data_columns.unwrap() { + data_columns_list.push(data_column); + } + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + harness + .chain + .import_historical_data_column_batch(Epoch::new(0), data_columns_list) + .unwrap(); + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()) + } +} + +// This should verify that a data column sidecar containing mismatched block roots should fail to be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_mismatched_block_root() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + if data_column.index % 2 == 0 { + data_column.signed_block_header.message.body_root = Hash256::ZERO; + } + + data_columns_list.push(Arc::new(data_column)); + } + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch( + start_slot.epoch(E::slots_per_epoch()), + data_columns_list, + ) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::NoBlockFound { .. } + )); +} + +// This should verify that a data column sidecar associated to a block root that doesn't exist in the store cannot +// be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_no_block_found() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + data_column.signed_block_header.message.body_root = Hash256::ZERO; + data_columns_list.push(Arc::new(data_column)); + } + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(Epoch::new(0), data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::NoBlockFound { .. } + )); +} + /// Test that blocks and attestations that refer to states around an unaligned split state are /// processed correctly. #[tokio::test] @@ -4845,6 +5086,166 @@ async fn test_custody_column_filtering_supernode() { ); } +#[tokio::test] +async fn test_missing_columns_after_cgc_change() { + let spec = test_spec::(); + + let num_validators = 8; + + let num_epochs_before_increase = 4; + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone().into()) + .deterministic_keypairs(num_validators) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + let state = harness.chain.head_beacon_state_cloned(); + + if !state.fork_name_unchecked().fulu_enabled() { + return; + } + + let custody_context = harness.chain.data_availability_checker.custody_context(); + + harness.advance_slot(); + harness + .extend_chain( + (E::slots_per_epoch() * num_epochs_before_increase) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let epoch_before_increase = Epoch::new(num_epochs_before_increase); + + let missing_columns = harness + .chain + .get_missing_columns_for_epoch(epoch_before_increase); + + // We should have no missing columns + assert_eq!(missing_columns.len(), 0); + + let epoch_after_increase = Epoch::new(num_epochs_before_increase + 2); + + let cgc_change_slot = epoch_before_increase.end_slot(E::slots_per_epoch()); + custody_context.register_validators(vec![(1, 32_000_000_000 * 9)], cgc_change_slot, &spec); + + harness.advance_slot(); + harness + .extend_chain( + (E::slots_per_epoch() * 5) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // We should have missing columns from before the cgc increase + let missing_columns = harness + .chain + .get_missing_columns_for_epoch(epoch_before_increase); + + assert!(!missing_columns.is_empty()); + + // We should have no missing columns after the cgc increase + let missing_columns = harness + .chain + .get_missing_columns_for_epoch(epoch_after_increase); + + assert!(missing_columns.is_empty()); +} + +#[tokio::test] +async fn test_safely_backfill_data_column_custody_info() { + let spec = test_spec::(); + + let num_validators = 8; + + let start_epochs = 4; + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone().into()) + .deterministic_keypairs(num_validators) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + let state = harness.chain.head_beacon_state_cloned(); + + if !state.fork_name_unchecked().fulu_enabled() { + return; + } + + let custody_context = harness.chain.data_availability_checker.custody_context(); + + harness.advance_slot(); + harness + .extend_chain( + (E::slots_per_epoch() * start_epochs) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let epoch_before_increase = Epoch::new(start_epochs); + let effective_delay_slots = + CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS / harness.chain.spec.seconds_per_slot; + + let cgc_change_slot = epoch_before_increase.end_slot(E::slots_per_epoch()); + + custody_context.register_validators(vec![(1, 32_000_000_000 * 16)], cgc_change_slot, &spec); + + let epoch_after_increase = + (cgc_change_slot + effective_delay_slots).epoch(E::slots_per_epoch()); + + harness.advance_slot(); + harness + .extend_chain( + (E::slots_per_epoch() * 5) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let head_slot = harness.chain.head().snapshot.beacon_block.slot(); + + harness + .chain + .update_data_column_custody_info(Some(head_slot)); + + // We can only safely update custody column info 1 epoch at a time + // Skipping an epoch should return an error + harness + .chain + .safely_backfill_data_column_custody_info(head_slot.epoch(E::slots_per_epoch()) - 2) + .unwrap_err(); + + // Iterate from the head epoch back to 0 and try to backfill data column custody info + for epoch in (0..head_slot.epoch(E::slots_per_epoch()).into()).rev() { + // This is an epoch before the cgc change took into effect, we shouldnt be able to update + // without performing custody backfill sync + if epoch <= epoch_after_increase.into() { + harness + .chain + .safely_backfill_data_column_custody_info(Epoch::new(epoch)) + .unwrap_err(); + } else { + // This is an epoch after the cgc change took into effect, we should be able to update + // as long as we iterate epoch by epoch + harness + .chain + .safely_backfill_data_column_custody_info(Epoch::new(epoch)) + .unwrap(); + let earliest_available_epoch = harness + .chain + .earliest_custodied_data_column_epoch() + .unwrap(); + assert_eq!(Epoch::new(epoch), earliest_available_epoch); + } + } +} + /// Checks that two chains are the same, for the purpose of these tests. /// /// Several fields that are hard/impossible to check are ignored (e.g., the store). diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index c83cdad7e0..10d9587ccc 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -57,6 +57,9 @@ pub fn spawn_notifier( // Store info if we are required to do a backfill sync. let original_oldest_block_slot = beacon_chain.store.get_anchor_info().oldest_block_slot; + // Use this info during custody backfill sync. + let mut original_earliest_data_column_slot = None; + let interval_future = async move { // Perform pre-genesis logging. loop { @@ -80,6 +83,7 @@ pub fn spawn_notifier( // Perform post-genesis logging. let mut last_backfill_log_slot = None; + let mut last_custody_backfill_log_slot = None; loop { // Run the notifier half way through each slot. @@ -112,6 +116,18 @@ pub fn spawn_notifier( let mut speedo = speedo.lock().await; speedo.clear(); } + (_, SyncState::CustodyBackFillSyncing { .. }) => { + // We have transitioned to a custody backfill sync. Reset the speedo. + let mut speedo = speedo.lock().await; + last_custody_backfill_log_slot = None; + speedo.clear(); + } + (SyncState::CustodyBackFillSyncing { .. }, _) => { + // We have transitioned from a custody backfill sync, reset the speedo + let mut speedo = speedo.lock().await; + last_custody_backfill_log_slot = None; + speedo.clear(); + } (_, _) => {} } current_sync_state = sync_state; @@ -154,6 +170,38 @@ pub fn spawn_notifier( Instant::now(), ); } + SyncState::CustodyBackFillSyncing { .. } => { + match beacon_chain.store.get_data_column_custody_info() { + Ok(data_column_custody_info) => { + if let Some(earliest_data_column_slot) = data_column_custody_info + .and_then(|info| info.earliest_data_column_slot) + && let Some(da_boundary) = beacon_chain.get_column_da_boundary() + { + sync_distance = earliest_data_column_slot.saturating_sub( + da_boundary.start_slot(T::EthSpec::slots_per_epoch()), + ); + + // We keep track of our starting point for custody backfill sync + // so we can measure our speed of progress. + if original_earliest_data_column_slot.is_none() { + original_earliest_data_column_slot = + Some(earliest_data_column_slot) + } + + if let Some(original_earliest_data_column_slot) = + original_earliest_data_column_slot + { + speedo.observe( + original_earliest_data_column_slot + .saturating_sub(earliest_data_column_slot), + Instant::now(), + ); + } + } + } + Err(e) => error!(error=?e, "Unable to get data column custody info"), + } + } SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } | SyncState::SyncTransition => { @@ -190,6 +238,8 @@ pub fn spawn_notifier( // Log if we are backfilling. let is_backfilling = matches!(current_sync_state, SyncState::BackFillSyncing { .. }); + let is_custody_backfilling = + matches!(current_sync_state, SyncState::CustodyBackFillSyncing { .. }); if is_backfilling && last_backfill_log_slot .is_none_or(|slot| slot + BACKFILL_LOG_INTERVAL <= current_slot) @@ -234,6 +284,51 @@ pub fn spawn_notifier( info!("Historical block download complete"); } + if is_custody_backfilling + && last_custody_backfill_log_slot + .is_none_or(|slot| slot + BACKFILL_LOG_INTERVAL <= current_slot) + { + last_custody_backfill_log_slot = Some(current_slot); + + let distance = format!( + "{} slots ({})", + sync_distance.as_u64(), + slot_distance_pretty(sync_distance, slot_duration) + ); + + let speed = speedo.slots_per_second(); + let display_speed = speed.is_some_and(|speed| speed != 0.0); + + if display_speed { + info!( + distance, + speed = sync_speed_pretty(speed), + est_time = + estimated_time_pretty(beacon_chain.get_column_da_boundary().and_then( + |da_boundary| speedo.estimated_time_till_slot( + da_boundary.start_slot(T::EthSpec::slots_per_epoch()) + ) + )), + "Downloading historical data columns" + ); + } else { + info!( + distance, + est_time = + estimated_time_pretty(beacon_chain.get_column_da_boundary().and_then( + |da_boundary| speedo.estimated_time_till_slot( + da_boundary.start_slot(T::EthSpec::slots_per_epoch()) + ) + )), + "Downloading historical data columns" + ); + } + } else if !is_custody_backfilling && last_custody_backfill_log_slot.is_some() { + last_custody_backfill_log_slot = None; + original_earliest_data_column_slot = None; + info!("Historical data column download complete"); + } + // Log if we are syncing if current_sync_state.is_syncing() { metrics::set_gauge(&metrics::IS_SYNCED, 0); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 170012b04b..f6d8dbc157 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -478,7 +478,9 @@ pub fn serve( ))) } } - SyncState::SyncTransition | SyncState::BackFillSyncing { .. } => Ok(()), + SyncState::SyncTransition + | SyncState::BackFillSyncing { .. } + | SyncState::CustodyBackFillSyncing { .. } => Ok(()), SyncState::Synced => Ok(()), SyncState::Stalled => Ok(()), } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 0f5fd99c27..f1a4d87de7 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -60,8 +60,8 @@ pub struct BlobsByRangeRequestId { pub struct DataColumnsByRangeRequestId { /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` pub id: Id, - /// The Id of the overall By Range request for block components. - pub parent_request_id: ComponentsByRangeRequestId, + /// The Id of the overall By Range request for either a components by range request or a custody backfill request. + pub parent_request_id: DataColumnsByRangeRequester, /// The peer id associated with the request. /// /// This is useful to penalize the peer at a later point if it returned data columns that @@ -69,6 +69,12 @@ pub struct DataColumnsByRangeRequestId { pub peer: PeerId, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum DataColumnsByRangeRequester { + ComponentsByRange(ComponentsByRangeRequestId), + CustodyBackfillSync(CustodyBackFillBatchRequestId), +} + /// Block components by range request for range sync. Includes an ID for downstream consumers to /// handle retries and tie all their sub requests together. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -80,6 +86,24 @@ pub struct ComponentsByRangeRequestId { pub requester: RangeRequestId, } +/// A batch of data columns by range request for custody sync. Includes an ID for downstream consumers to +/// handle retries and tie all the range requests for the given epoch together. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodyBackFillBatchRequestId { + /// For each `epoch` we may request the same data in a later retry. This Id identifies the + /// current attempt. + pub id: Id, + pub batch_id: CustodyBackfillBatchId, +} + +/// Custody backfill may be restarted and sync each epoch multiple times in different runs. Identify +/// each batch by epoch and run_id for uniqueness. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodyBackfillBatchId { + pub epoch: Epoch, + pub run_id: u64, +} + /// Range sync chain or backfill batch #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RangeRequestId { @@ -217,6 +241,8 @@ impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); impl_display!(CustodyId, "{}", requester); +impl_display!(CustodyBackFillBatchRequestId, "{}/{}", id, batch_id); +impl_display!(CustodyBackfillBatchId, "{}/{}", epoch, run_id); impl Display for DataColumnsByRootRequester { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -241,6 +267,15 @@ impl Display for RangeRequestId { } } +impl Display for DataColumnsByRangeRequester { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::ComponentsByRange(id) => write!(f, "ByRange/{id}"), + Self::CustodyBackfillSync(id) => write!(f, "CustodyBackfill/{id}"), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -263,15 +298,17 @@ mod tests { fn display_id_data_columns_by_range() { let id = DataColumnsByRangeRequestId { id: 123, - parent_request_id: ComponentsByRangeRequestId { - id: 122, - requester: RangeRequestId::RangeSync { - chain_id: 54, - batch_id: Epoch::new(0), + parent_request_id: DataColumnsByRangeRequester::ComponentsByRange( + ComponentsByRangeRequestId { + id: 122, + requester: RangeRequestId::RangeSync { + chain_id: 54, + batch_id: Epoch::new(0), + }, }, - }, + ), peer: PeerId::random(), }; - assert_eq!(format!("{id}"), "123/122/RangeSync/0/54"); + assert_eq!(format!("{id}"), "123/ByRange/122/RangeSync/0/54"); } } diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index b8c34f8392..2a3571c3b7 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -4,6 +4,7 @@ use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV3}; use crate::types::{BackFillState, SyncState}; use crate::{Client, Enr, GossipTopic, Multiaddr, NetworkConfig, PeerId}; +use eth2::lighthouse::sync_state::CustodyBackFillState; use network_utils::enr_ext::EnrExt; use parking_lot::RwLock; use std::collections::HashSet; @@ -29,6 +30,8 @@ pub struct NetworkGlobals { pub sync_state: RwLock, /// The current state of the backfill sync. pub backfill_state: RwLock, + /// The current state of custody sync. + pub custody_sync_state: RwLock, /// The computed sampling subnets and columns is stored to avoid re-computing. pub sampling_subnets: RwLock>, /// Network-related configuration. Immutable after initialization. @@ -91,6 +94,9 @@ impl NetworkGlobals { gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::Paused), + custody_sync_state: RwLock::new(CustodyBackFillState::Pending( + "Custody backfill sync initialized".to_string(), + )), sampling_subnets: RwLock::new(sampling_subnets), config, spec, diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index 0bbbcebaf2..3f57406fc7 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -10,7 +10,7 @@ pub type EnrSyncCommitteeBitfield = BitVector<::SyncCommitteeSu pub type Enr = discv5::enr::Enr; -pub use eth2::lighthouse::sync_state::{BackFillState, SyncState}; +pub use eth2::lighthouse::sync_state::{BackFillState, CustodyBackFillState, SyncState}; pub use globals::NetworkGlobals; pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; diff --git a/beacon_node/lighthouse_tracing/src/lib.rs b/beacon_node/lighthouse_tracing/src/lib.rs index 18a9874252..56dccadaa9 100644 --- a/beacon_node/lighthouse_tracing/src/lib.rs +++ b/beacon_node/lighthouse_tracing/src/lib.rs @@ -26,7 +26,9 @@ pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block"; pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs"; pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns"; pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment"; +pub const SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST: &str = "custody_backfill_sync_batch_request"; pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill"; +pub const SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS: &str = "custody_backfill_sync_import_columns"; /// Fork choice root spans pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot"; @@ -73,4 +75,6 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[ SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP, SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE, SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE, + SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST, + SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS, ]; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index a2b5af8b08..cea06a28c8 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -212,6 +212,22 @@ pub static BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL: LazyLock, +> = LazyLock::new(|| { + try_create_int_counter( + "beacon_processor_custody_backfill_column_import_success_total", + "Total number of custody backfill sync columns successfully processed.", + ) +}); +pub static BEACON_PROCESSOR_CUSTODY_BACKFILL_BATCH_FAILED_TOTAL: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_processor_custody_backfill_batch_failed_total", + "Total number of custody backfill batches that failed to be processed.", + ) + }); // Chain segments. pub static BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL: LazyLock> = LazyLock::new(|| { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 85ccde1d59..7441e92871 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -16,6 +16,7 @@ use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, LightClientUpdatesByRangeRequest, }; +use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PubsubMessage, rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, @@ -492,6 +493,22 @@ impl NetworkBeaconProcessor { }) } + pub fn send_historic_data_columns( + self: &Arc, + batch_id: CustodyBackfillBatchId, + data_columns: DataColumnSidecarList, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = move || processor.process_historic_data_columns(batch_id, data_columns); + + let work = Work::ChainSegmentBackfill(Box::new(process_fn)); + + self.try_send(BeaconWorkEvent { + drop_during_sync: true, + work, + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 1d99540c29..41b12fa01b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,6 +1,7 @@ use crate::metrics::{self, register_process_result_metrics}; use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProcessor}; use crate::sync::BatchProcessResult; +use crate::sync::manager::CustodyBatchProcessResult; use crate::sync::{ ChainId, manager::{BlockProcessType, SyncMessage}, @@ -8,6 +9,7 @@ use crate::sync::{ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; +use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, validator_monitor::get_slot_delay_ms, @@ -18,15 +20,17 @@ use beacon_processor::{ }; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::PeerAction; +use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_tracing::{ - SPAN_PROCESS_CHAIN_SEGMENT, SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, SPAN_PROCESS_RPC_BLOBS, - SPAN_PROCESS_RPC_BLOCK, SPAN_PROCESS_RPC_CUSTODY_COLUMNS, + SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS, SPAN_PROCESS_CHAIN_SEGMENT, + SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_BLOCK, + SPAN_PROCESS_RPC_CUSTODY_COLUMNS, }; use logging::crit; use std::sync::Arc; use std::time::Duration; use store::KzgCommitment; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{debug, debug_span, error, info, instrument, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; @@ -418,6 +422,103 @@ impl NetworkBeaconProcessor { }); } + pub fn process_historic_data_columns( + &self, + batch_id: CustodyBackfillBatchId, + downloaded_columns: DataColumnSidecarList, + ) { + let _guard = debug_span!( + SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS, + epoch = %batch_id.epoch, + columns_received_count = downloaded_columns.len() + ) + .entered(); + + let sent_columns = downloaded_columns.len(); + let result = match self + .chain + .import_historical_data_column_batch(batch_id.epoch, downloaded_columns) + { + Ok(imported_columns) => { + metrics::inc_counter_by( + &metrics::BEACON_PROCESSOR_CUSTODY_BACKFILL_COLUMN_IMPORT_SUCCESS_TOTAL, + imported_columns as u64, + ); + CustodyBatchProcessResult::Success { + sent_columns, + imported_columns, + } + } + Err(e) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_CUSTODY_BACKFILL_BATCH_FAILED_TOTAL, + ); + let peer_action: Option = match &e { + HistoricalDataColumnError::NoBlockFound { + data_column_block_root, + expected_block_root, + } => { + debug!( + error = "no_block_found", + ?data_column_block_root, + ?expected_block_root, + "Custody backfill batch processing error" + ); + // The peer is faulty if they send blocks with bad roots. + Some(PeerAction::LowToleranceError) + } + HistoricalDataColumnError::MissingDataColumns { .. } => { + warn!( + error = ?e, + "Custody backfill batch processing error", + ); + // The peer is faulty if they don't return data columns + // that they advertised as available. + Some(PeerAction::LowToleranceError) + } + HistoricalDataColumnError::InvalidKzg => { + warn!( + error = ?e, + "Custody backfill batch processing error", + ); + // The peer is faulty if they don't return data columns + // with valid kzg commitments. + Some(PeerAction::LowToleranceError) + } + HistoricalDataColumnError::BeaconChainError(e) => { + match &**e { + beacon_chain::BeaconChainError::FailedColumnCustodyInfoUpdate => {} + _ => { + warn!( + error = ?e, + "Custody backfill batch processing error", + ); + } + } + + // This is an interal error, don't penalize the peer + None + } + HistoricalDataColumnError::IndexOutOfBounds => { + error!( + error = ?e, + "Custody backfill batch out of bounds error" + ); + // This should never occur, don't penalize the peer. + None + } + HistoricalDataColumnError::StoreError(e) => { + warn!(error = ?e, "Custody backfill batch processing error"); + // This is an internal error, don't penalize the peer. + None + } + }; + CustodyBatchProcessResult::Error { peer_action } + } + }; + self.send_sync_message(SyncMessage::CustodyBatchProcessed { result, batch_id }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. #[instrument( diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 00597586b8..6c0cbd7e55 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -9,24 +9,27 @@ //! sync as failed, log an error and attempt to retry once a new peer joins the node. use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::batch::{ + BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, +}; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::manager::BatchProcessResult; use crate::sync::network_context::{ RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext, }; -use crate::sync::range_sync::{ - BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, -}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::service::api_types::Id; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; +use std::collections::hash_map::DefaultHasher; use std::collections::{ HashSet, btree_map::{BTreeMap, Entry}, }; +use std::hash::{Hash, Hasher}; +use std::marker::PhantomData; use std::sync::Arc; use tracing::{debug, error, info, warn}; use types::{ColumnIndex, Epoch, EthSpec}; @@ -49,21 +52,27 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 10; /// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 10; -/// Custom configuration for the batch object. -struct BackFillBatchConfig {} +type RpcBlocks = Vec>; -impl BatchConfig for BackFillBatchConfig { +type BackFillBatchInfo = BatchInfo, RpcBlocks>; + +type BackFillSyncBatches = BTreeMap>; + +/// Custom configuration for the batch object. +struct BackFillBatchConfig { + marker: PhantomData, +} + +impl BatchConfig for BackFillBatchConfig { fn max_batch_download_attempts() -> u8 { MAX_BATCH_DOWNLOAD_ATTEMPTS } fn max_batch_processing_attempts() -> u8 { MAX_BATCH_PROCESSING_ATTEMPTS } - fn batch_attempt_hash(blocks: &[RpcBlock]) -> u64 { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; + fn batch_attempt_hash(data: &D) -> u64 { let mut hasher = DefaultHasher::new(); - blocks.hash(&mut hasher); + data.hash(&mut hasher); hasher.finish() } } @@ -121,7 +130,7 @@ pub struct BackFillSync { last_batch_downloaded: bool, /// Sorted map of batches undergoing some kind of processing. - batches: BTreeMap>, + batches: BackFillSyncBatches, /// The current processing batch, if any. current_processing_batch: Option, @@ -349,7 +358,7 @@ impl BackFillSync { // reasons. Check that this block belongs to the expected peer // TODO(das): removed peer_id matching as the node may request a different peer for data // columns. - if !batch.is_expecting_block(&request_id) { + if !batch.is_expecting_request_id(&request_id) { return Ok(()); } debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed"); @@ -393,12 +402,13 @@ impl BackFillSync { // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer, and that the // request_id matches - if !batch.is_expecting_block(&request_id) { + if !batch.is_expecting_request_id(&request_id) { return Ok(ProcessResult::Successful); } + let received = blocks.len(); match batch.download_completed(blocks, *peer_id) { - Ok(received) => { + Ok(_) => { let awaiting_batches = self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; debug!( @@ -1050,7 +1060,7 @@ impl BackFillSync { // only request batches up to the buffer size limit // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync // if the current processing window is contained in a long range of skip slots. - let in_buffer = |batch: &BatchInfo| { + let in_buffer = |batch: &BackFillBatchInfo| { matches!( batch.state(), BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/batch.rs similarity index 82% rename from beacon_node/network/src/sync/range_sync/batch.rs rename to beacon_node/network/src/sync/batch.rs index c79800bfbe..ea0ef15f4b 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -2,29 +2,28 @@ use beacon_chain::block_verification_types::RpcBlock; use derivative::Derivative; use lighthouse_network::PeerId; use lighthouse_network::rpc::methods::BlocksByRangeRequest; +use lighthouse_network::rpc::methods::DataColumnsByRangeRequest; use lighthouse_network::service::api_types::Id; use std::collections::HashSet; -use std::fmt; -use std::hash::{Hash, Hasher}; +use std::hash::Hash; +use std::marker::PhantomData; use std::ops::Sub; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::time::Instant; use strum::Display; -use types::{Epoch, EthSpec, Slot}; +use types::Slot; +use types::{DataColumnSidecarList, Epoch, EthSpec}; -/// The number of times to retry a batch before it is considered failed. -const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; - -/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed -/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. -const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; +pub type BatchId = Epoch; /// Type of expected batch. -#[derive(Debug, Copy, Clone, Display)] +#[derive(Debug, Clone, Display)] #[strum(serialize_all = "snake_case")] pub enum ByRangeRequestType { BlocksAndColumns, BlocksAndBlobs, Blocks, + Columns(HashSet), } /// Allows customisation of the above constants used in other sync methods such as BackFillSync. @@ -60,28 +59,10 @@ pub trait BatchConfig { /// Note that simpler hashing functions considered in the past (hash of first block, hash of last /// block, number of received blocks) are not good enough to differentiate attempts. For this /// reason, we hash the complete set of blocks both in RangeSync and BackFillSync. - fn batch_attempt_hash(blocks: &[RpcBlock]) -> u64; + fn batch_attempt_hash(data: &D) -> u64; } #[derive(Debug)] -pub struct RangeSyncBatchConfig {} - -impl BatchConfig for RangeSyncBatchConfig { - fn max_batch_download_attempts() -> u8 { - MAX_BATCH_DOWNLOAD_ATTEMPTS - } - fn max_batch_processing_attempts() -> u8 { - MAX_BATCH_PROCESSING_ATTEMPTS - } - fn batch_attempt_hash(blocks: &[RpcBlock]) -> u64 { - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - blocks.hash(&mut hasher); - hasher.finish() - } -} - -/// Error type of a batch in a wrong state. -// Such errors should never be encountered. pub struct WrongState(pub(crate) String); /// After batch operations, we use this to communicate whether a batch can continue or not @@ -100,28 +81,30 @@ pub enum BatchProcessingResult { #[derive(Derivative)] #[derivative(Debug)] /// A segment of a chain. -pub struct BatchInfo { +pub struct BatchInfo { /// Start slot of the batch. start_slot: Slot, /// End slot of the batch. end_slot: Slot, /// The `Attempts` that have been made and failed to send us this batch. - failed_processing_attempts: Vec, + failed_processing_attempts: Vec>, /// Number of processing attempts that have failed but we do not count. non_faulty_processing_attempts: u8, /// The number of download retries this batch has undergone due to a failed request. failed_download_attempts: Vec>, /// State of the batch. - state: BatchState, + state: BatchState, /// Whether this batch contains all blocks or all blocks and blobs. batch_type: ByRangeRequestType, /// Pin the generic #[derivative(Debug = "ignore")] - marker: std::marker::PhantomData, + marker: std::marker::PhantomData<(E, B)>, } -impl fmt::Display for BatchInfo { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl std::fmt::Display + for BatchInfo +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "Start Slot: {}, End Slot: {}, State: {}", @@ -132,21 +115,21 @@ impl fmt::Display for BatchInfo { #[derive(Display)] /// Current state of a batch -pub enum BatchState { +pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. Downloading(Id), /// The batch has been completely downloaded and is ready for processing. - AwaitingProcessing(PeerId, Vec>, Instant), + AwaitingProcessing(PeerId, D, Instant), /// The batch is being processed. - Processing(Attempt), + Processing(Attempt), /// The batch was successfully processed and is waiting to be validated. /// /// It is not sufficient to process a batch successfully to consider it correct. This is /// because batches could be erroneously empty, or incomplete. Therefore, a batch is considered /// valid, only if the next sequential batch imports at least a block. - AwaitingValidation(Attempt), + AwaitingValidation(Attempt), /// Intermediate state for inner state handling. Poisoned, /// The batch has maxed out the allowed attempts for either downloading or processing. It @@ -154,14 +137,14 @@ pub enum BatchState { Failed, } -impl BatchState { +impl BatchState { /// Helper function for poisoning a state. - pub fn poison(&mut self) -> BatchState { + pub fn poison(&mut self) -> BatchState { std::mem::replace(self, BatchState::Poisoned) } } -impl BatchInfo { +impl BatchInfo { /// Batches are downloaded excluding the first block of the epoch assuming it has already been /// downloaded. /// @@ -178,13 +161,13 @@ impl BatchInfo { pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ByRangeRequestType) -> Self { let start_slot = start_epoch.start_slot(E::slots_per_epoch()); let end_slot = start_slot + num_of_epochs * E::slots_per_epoch(); - BatchInfo { + Self { start_slot, end_slot, failed_processing_attempts: Vec::new(), failed_download_attempts: Vec::new(), non_faulty_processing_attempts: 0, - state: BatchState::AwaitingDownload, + state: BatchState::::AwaitingDownload, batch_type, marker: std::marker::PhantomData, } @@ -208,8 +191,8 @@ impl BatchInfo { peers } - /// Verifies if an incoming block belongs to this batch. - pub fn is_expecting_block(&self, request_id: &Id) -> bool { + /// Verifies if an incoming request id to this batch. + pub fn is_expecting_request_id(&self, request_id: &Id) -> bool { if let BatchState::Downloading(expected_id) = &self.state { return expected_id == request_id; } @@ -227,30 +210,6 @@ impl BatchInfo { } } - /// Returns the count of stored pending blocks if in awaiting processing state - pub fn pending_blocks(&self) -> usize { - match &self.state { - BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(), - BatchState::AwaitingDownload - | BatchState::Downloading { .. } - | BatchState::Processing { .. } - | BatchState::AwaitingValidation { .. } - | BatchState::Poisoned - | BatchState::Failed => 0, - } - } - - /// Returns a BlocksByRange request associated with the batch. - pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) { - ( - BlocksByRangeRequest::new( - self.start_slot.into(), - self.end_slot.sub(self.start_slot).into(), - ), - self.batch_type, - ) - } - /// After different operations over a batch, this could be in a state that allows it to /// continue, or in failed state. When the batch has failed, we check if it did mainly due to /// processing failures. In this case the batch is considered failed and faulty. @@ -265,27 +224,22 @@ impl BatchInfo { } } - pub fn state(&self) -> &BatchState { + pub fn state(&self) -> &BatchState { &self.state } - pub fn attempts(&self) -> &[Attempt] { + pub fn attempts(&self) -> &[Attempt] { &self.failed_processing_attempts } - /// Marks the batch as ready to be processed if the blocks are in the range. The number of - /// received blocks is returned, or the wrong batch end on failure + /// Marks the batch as ready to be processed if the data columns are in the range. The number of + /// received columns is returned, or the wrong batch end on failure #[must_use = "Batch may have failed"] - pub fn download_completed( - &mut self, - blocks: Vec>, - peer: PeerId, - ) -> Result { + pub fn download_completed(&mut self, data_columns: D, peer: PeerId) -> Result<(), WrongState> { match self.state.poison() { BatchState::Downloading(_) => { - let received = blocks.len(); - self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); - Ok(received) + self.state = BatchState::AwaitingProcessing(peer, data_columns, Instant::now()); + Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -376,17 +330,17 @@ impl BatchInfo { } } - pub fn start_processing(&mut self) -> Result<(Vec>, Duration), WrongState> { + pub fn start_processing(&mut self) -> Result<(D, Duration), WrongState> { match self.state.poison() { - BatchState::AwaitingProcessing(peer, blocks, start_instant) => { - self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); - Ok((blocks, start_instant.elapsed())) + BatchState::AwaitingProcessing(peer, data_columns, start_instant) => { + self.state = BatchState::Processing(Attempt::new::(peer, &data_columns)); + Ok((data_columns, start_instant.elapsed())) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { self.state = other; Err(WrongState(format!( - "Starting procesing batch in wrong state {:?}", + "Starting processing batch in wrong state {:?}", self.state ))) } @@ -466,37 +420,86 @@ impl BatchInfo { } } -/// Represents a peer's attempt and providing the result for this batch. -/// -/// Invalid attempts will downscore a peer. -#[derive(PartialEq, Debug)] -pub struct Attempt { +// BatchInfo implementations for RangeSync +impl BatchInfo>> { + /// Returns a BlocksByRange request associated with the batch. + pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) { + ( + BlocksByRangeRequest::new( + self.start_slot.into(), + self.end_slot.sub(self.start_slot).into(), + ), + self.batch_type.clone(), + ) + } + + /// Returns the count of stored pending blocks if in awaiting processing state + pub fn pending_blocks(&self) -> usize { + match &self.state { + BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(), + BatchState::AwaitingDownload + | BatchState::Downloading { .. } + | BatchState::Processing { .. } + | BatchState::AwaitingValidation { .. } + | BatchState::Poisoned + | BatchState::Failed => 0, + } + } +} + +// BatchInfo implementation for CustodyBackFillSync +impl BatchInfo> { + /// Returns a DataColumnsByRange request associated with the batch. + pub fn to_data_columns_by_range_request( + &self, + ) -> Result { + match &self.batch_type { + ByRangeRequestType::Columns(columns) => Ok(DataColumnsByRangeRequest { + start_slot: self.start_slot.into(), + count: self.end_slot.sub(self.start_slot).into(), + columns: columns.clone().into_iter().collect(), + }), + _ => Err(WrongState( + "Custody backfill sync can only make data columns by range requests.".to_string(), + )), + } + } +} + +#[derive(Debug)] +pub struct Attempt { /// The peer that made the attempt. pub peer_id: PeerId, /// The hash of the blocks of the attempt. pub hash: u64, + /// Pin the generic. + marker: PhantomData, } -impl Attempt { - fn new(peer_id: PeerId, blocks: &[RpcBlock]) -> Self { - let hash = B::batch_attempt_hash(blocks); - Attempt { peer_id, hash } +impl Attempt { + fn new(peer_id: PeerId, data: &D) -> Self { + let hash = B::batch_attempt_hash(data); + Attempt { + peer_id, + hash, + marker: PhantomData, + } } } -impl std::fmt::Debug for BatchState { +impl std::fmt::Debug for BatchState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - BatchState::Processing(Attempt { peer_id, hash: _ }) => { + BatchState::Processing(Attempt { peer_id, .. }) => { write!(f, "Processing({})", peer_id) } - BatchState::AwaitingValidation(Attempt { peer_id, hash: _ }) => { + BatchState::AwaitingValidation(Attempt { peer_id, .. }) => { write!(f, "AwaitingValidation({})", peer_id) } BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), BatchState::Failed => f.write_str("Failed"), - BatchState::AwaitingProcessing(peer, blocks, _) => { - write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) + BatchState::AwaitingProcessing(peer, ..) => { + write!(f, "AwaitingProcessing({})", peer) } BatchState::Downloading(request_id) => { write!(f, "Downloading({})", request_id) @@ -506,7 +509,7 @@ impl std::fmt::Debug for BatchState { } } -impl BatchState { +impl BatchState { /// Creates a character representation/visualization for the batch state to display in logs for quicker and /// easier recognition fn visualize(&self) -> char { diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index ba89d11225..cd9276f7e3 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -36,7 +36,7 @@ pub struct RangeBlockComponentsRequest { pub(crate) request_span: Span, } -enum ByRangeRequest { +pub enum ByRangeRequest { Active(I), Complete(T), } @@ -435,7 +435,7 @@ impl RangeBlockComponentsRequest { } impl ByRangeRequest { - fn finish(&mut self, id: I, data: T) -> Result<(), String> { + pub fn finish(&mut self, id: I, data: T) -> Result<(), String> { match self { Self::Active(expected_id) => { if expected_id != &id { @@ -448,7 +448,7 @@ impl ByRangeRequest { } } - fn to_finished(&self) -> Option<&T> { + pub fn to_finished(&self) -> Option<&T> { match self { Self::Active(_) => None, Self::Complete(data) => Some(data), @@ -467,7 +467,7 @@ mod tests { PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, Id, RangeRequestId, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id, RangeRequestId, }, }; use rand::SeedableRng; @@ -501,7 +501,7 @@ mod tests { fn columns_id( id: Id, - parent_request_id: ComponentsByRangeRequestId, + parent_request_id: DataColumnsByRangeRequester, ) -> DataColumnsByRangeRequestId { DataColumnsByRangeRequestId { id, @@ -598,7 +598,15 @@ mod tests { let columns_req_id = expects_custody_columns .iter() .enumerate() - .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, @@ -657,7 +665,15 @@ mod tests { let columns_req_id = batched_column_requests .iter() .enumerate() - .map(|(i, columns)| (columns_id(i as Id, components_id), columns.clone())) + .map(|(i, columns)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + columns.clone(), + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( @@ -738,7 +754,15 @@ mod tests { let columns_req_id = expected_custody_columns .iter() .enumerate() - .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, @@ -816,7 +840,15 @@ mod tests { let columns_req_id = expected_custody_columns .iter() .enumerate() - .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, @@ -852,7 +884,10 @@ mod tests { assert!(result.is_err()); // AND: We retry with a new peer for the failed column - let new_columns_req_id = columns_id(10 as Id, components_id); + let new_columns_req_id = columns_id( + 10 as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ); let failed_column_requests = vec![(new_columns_req_id, vec![2])]; info.reinsert_failed_column_requests(failed_column_requests) .unwrap(); @@ -898,7 +933,15 @@ mod tests { let columns_req_id = expected_custody_columns .iter() .enumerate() - .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, diff --git a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs new file mode 100644 index 0000000000..69df3422e6 --- /dev/null +++ b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs @@ -0,0 +1,1126 @@ +use std::{ + collections::{BTreeMap, HashSet, btree_map::Entry}, + marker::PhantomData, + sync::Arc, +}; + +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::{ + NetworkGlobals, PeerAction, PeerId, + service::api_types::{CustodyBackFillBatchRequestId, CustodyBackfillBatchId}, + types::CustodyBackFillState, +}; +use lighthouse_tracing::SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST; +use logging::crit; +use std::hash::{DefaultHasher, Hash, Hasher}; +use tracing::{debug, error, info, info_span, warn}; +use types::{DataColumnSidecarList, Epoch, EthSpec}; + +use crate::sync::{ + backfill_sync::{BACKFILL_EPOCHS_PER_BATCH, ProcessResult, SyncStart}, + batch::{ + BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + ByRangeRequestType, + }, + block_sidecar_coupling::CouplingError, + manager::CustodyBatchProcessResult, + network_context::{RpcResponseError, SyncNetworkContext}, +}; + +/// The maximum number of batches to queue before requesting more. +const BACKFILL_BATCH_BUFFER_SIZE: u8 = 5; + +/// Columns are downloaded in batches from peers. This constant specifies how many epochs worth of +/// columns per batch are requested _at most_. A batch may request less columns to account for +/// already requested columns. There is a timeout for each batch request. If this value is too high, +/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which +/// case the responder will fill the response up to the max request size, assuming they have the +/// bandwidth to do so. +pub const CUSTODY_BACKFILL_EPOCHS_PER_BATCH: u64 = 1; + +type CustodyBackFillBatchInfo = + BatchInfo, DataColumnSidecarList>; +type CustodyBackFillBatches = BTreeMap>; + +#[derive(Debug)] +pub struct CustodyBackFillBatchConfig { + marker: PhantomData, +} + +impl BatchConfig for CustodyBackFillBatchConfig { + fn max_batch_download_attempts() -> u8 { + 5 + } + fn max_batch_processing_attempts() -> u8 { + 5 + } + fn batch_attempt_hash(data: &D) -> u64 { + let mut hasher = DefaultHasher::new(); + data.hash(&mut hasher); + hasher.finish() + } +} + +/// The ways a custody backfill sync can fail. +// The info in the enum variants is displayed in logging, clippy thinks it's dead code. +#[derive(Debug)] +pub enum CustodyBackfillError { + /// A batch failed to be downloaded. + BatchDownloadFailed(#[allow(dead_code)] BatchId), + /// A batch could not be processed. + BatchProcessingFailed(#[allow(dead_code)] BatchId), + /// A batch entered an invalid state. + BatchInvalidState(#[allow(dead_code)] BatchId, #[allow(dead_code)] String), + /// The sync algorithm entered an invalid state. + InvalidSyncState(#[allow(dead_code)] String), + /// The chain became paused. + Paused, +} + +pub struct CustodyBackFillSync { + /// Keeps track of the current progress of the custody backfill. + /// This only gets refreshed from the beacon chain if we enter a failed state. + current_start: BatchId, + + /// Starting epoch of the batch that needs to be processed next. + /// This is incremented as the chain advances. + processing_target: BatchId, + + /// The custody group count we are trying to fulfill up to the DA window. + /// This is used as an indicator to restart custody backfill sync if the cgc + /// was changed in the middle of a currently active sync. + cgc: u64, + + /// Run ID of this backfill process. Increments if sync restarts. Used to differentiate batch + /// results from different runs. + run_id: u64, + + /// Starting epoch of the next batch that needs to be downloaded. + to_be_downloaded: BatchId, + + /// Keeps track if we have requested the final batch. + last_batch_downloaded: bool, + + /// Sorted map of batches undergoing some kind of processing. + batches: CustodyBackFillBatches, + + /// The current processing batch, if any. + current_processing_batch: Option, + + /// Batches validated. + validated_batches: u64, + + /// These are batches that we've skipped because we have no columns to fetch for the epoch. + skipped_batches: HashSet, + + /// When a custody backfill sync fails, we keep track of whether a new fully synced peer has joined. + /// This signifies that we are able to attempt to restart a failed chain. + restart_failed_sync: bool, + + /// Reference to the beacon chain to obtain initial starting points for custody backfill sync. + beacon_chain: Arc>, + + /// Reference to the network globals in order to obtain valid peers to backfill columns from + /// (i.e synced peers). + network_globals: Arc>, +} + +impl CustodyBackFillSync { + pub fn new( + beacon_chain: Arc>, + network_globals: Arc>, + ) -> Self { + Self { + current_start: Epoch::new(0), + processing_target: Epoch::new(0), + cgc: 0, + run_id: 0, + to_be_downloaded: Epoch::new(0), + last_batch_downloaded: false, + batches: BTreeMap::new(), + skipped_batches: HashSet::new(), + current_processing_batch: None, + validated_batches: 0, + restart_failed_sync: false, + beacon_chain, + network_globals, + } + } + + /// Pauses the custody sync if it's currently syncing. + pub fn pause(&mut self, reason: String) { + if let CustodyBackFillState::Syncing = self.state() { + debug!(processed_epochs = %self.validated_batches, to_be_processed = %self.current_start,"Custody backfill sync paused"); + self.set_state(CustodyBackFillState::Pending(reason)); + } + } + + /// Checks if custody backfill sync should start and sets the missing columns + /// custody backfill sync will attempt to fetch. + /// The criteria to start custody sync is: + /// - The earliest data column epoch's custodied columns != previous epoch's custodied columns + /// - The earliest data column epoch is a finalied epoch + pub fn should_start_custody_backfill_sync(&mut self) -> bool { + let Some(da_boundary_epoch) = self.beacon_chain.get_column_da_boundary() else { + return false; + }; + + // This is the epoch in which we have met our current custody requirements + let Some(earliest_data_column_epoch) = + self.beacon_chain.earliest_custodied_data_column_epoch() + else { + return false; + }; + + // Check if we have missing columns between the da boundary and `earliest_data_column_epoch` + let missing_columns = self + .beacon_chain + .get_missing_columns_for_epoch(da_boundary_epoch); + + if !missing_columns.is_empty() { + let latest_finalized_epoch = self + .beacon_chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch; + + // Check that the earliest data column epoch is a finalized epoch. + return earliest_data_column_epoch <= latest_finalized_epoch; + } + + false + } + + fn restart_sync(&mut self) { + // Set state to paused + self.set_state(CustodyBackFillState::Pending( + "CGC count has changed and custody backfill sync needs to restart".to_string(), + )); + + // Remove all batches and active requests. + self.batches.clear(); + self.skipped_batches.clear(); + self.restart_failed_sync = false; + + // Reset all downloading and processing targets + // NOTE: Lets keep validated_batches for posterity + self.processing_target = Epoch::new(0); + self.to_be_downloaded = Epoch::new(0); + self.last_batch_downloaded = false; + self.current_processing_batch = None; + self.validated_batches = 0; + self.run_id += 1; + + self.set_start_epoch(); + self.set_cgc(); + } + + fn restart_if_required(&mut self) -> bool { + let cgc_at_head = self + .beacon_chain + .data_availability_checker + .custody_context() + .custody_group_count_at_head(&self.beacon_chain.spec); + + if cgc_at_head != self.cgc { + self.restart_sync(); + return true; + } + + false + } + + /// Starts syncing. + #[must_use = "A failure here indicates custody backfill sync has failed and the global sync state should be updated"] + pub fn start( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + match self.state() { + CustodyBackFillState::Syncing => { + if self.restart_if_required() { + return Ok(SyncStart::NotSyncing); + } + + if self.check_completed() { + self.set_state(CustodyBackFillState::Completed); + return Ok(SyncStart::NotSyncing); + } + } + CustodyBackFillState::Pending(_) | CustodyBackFillState::Completed => { + if self.check_completed() { + self.set_state(CustodyBackFillState::Completed); + return Ok(SyncStart::NotSyncing); + } + self.set_cgc(); + + if !self.should_start_custody_backfill_sync() { + return Ok(SyncStart::NotSyncing); + } + self.set_start_epoch(); + if self + .network_globals + .peers + .read() + .synced_peers() + .next() + .is_some() + { + debug!( + run_id = self.run_id, + current_start = %self.current_start, + processing_target = %self.processing_target, + to_be_downloaded = %self.to_be_downloaded, + "Starting custody backfill sync" + ); + // If there are peers to resume with, begin the resume. + self.set_state(CustodyBackFillState::Syncing); + // Resume any previously failed batches. + self.resume_batches(network)?; + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + + // start processing batches if needed + self.process_completed_batches(network)?; + } else { + return Ok(SyncStart::NotSyncing); + } + } + } + + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return Ok(SyncStart::NotSyncing); + }; + + Ok(SyncStart::Syncing { + completed: (self.validated_batches + * CUSTODY_BACKFILL_EPOCHS_PER_BATCH + * T::EthSpec::slots_per_epoch()) as usize, + remaining: self + .current_start + .end_slot(T::EthSpec::slots_per_epoch()) + .saturating_sub(column_da_boundary.start_slot(T::EthSpec::slots_per_epoch())) + .as_usize(), + }) + } + + fn set_cgc(&mut self) { + self.cgc = self + .beacon_chain + .data_availability_checker + .custody_context() + .custody_group_count_at_head(&self.beacon_chain.spec); + } + + fn set_start_epoch(&mut self) { + let earliest_data_column_epoch = self + .beacon_chain + .earliest_custodied_data_column_epoch() + .unwrap_or(Epoch::new(0)); + + self.current_start = earliest_data_column_epoch + 1; + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + } + + /// Attempts to request the next required batches from the peer pool. It will exhaust the peer + /// pool and left over batches until the batch buffer is reached or all peers are exhausted. + fn request_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result<(), CustodyBackfillError> { + if !matches!(self.state(), CustodyBackFillState::Syncing) { + return Ok(()); + } + + // find the next pending batch and request it from the peer + // Note: for this function to not infinite loop we must: + // - If `include_next_batch` returns Some we MUST increase the count of batches that are + // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of + // that function. + while let Some(batch_id) = self.include_next_batch() { + // send the batch + self.send_batch(network, batch_id)?; + } + + // No more batches, simply stop + Ok(()) + } + + /// When resuming a chain, this function searches for batches that need to be re-downloaded and + /// transitions their state to redownload the batch. + fn resume_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result<(), CustodyBackfillError> { + let batch_ids_to_retry = self + .batches + .iter() + .filter_map(|(batch_id, batch)| { + // In principle there should only ever be on of these, and we could terminate the + // loop early, however the processing is negligible and we continue the search + // for robustness to handle potential future modification + if matches!(batch.state(), BatchState::AwaitingDownload) { + Some(*batch_id) + } else { + None + } + }) + .collect::>(); + + for batch_id in batch_ids_to_retry { + self.send_batch(network, batch_id)?; + } + Ok(()) + } + + /// Creates the next required batch from the chain. If there are no more batches required, + /// `None` is returned. + fn include_next_batch(&mut self) -> Option { + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return None; + }; + + let mut missing_columns = HashSet::new(); + + // Skip all batches (Epochs) that don't have missing columns. + for epoch in Epoch::range_inclusive_rev(self.to_be_downloaded, column_da_boundary) { + missing_columns = self.beacon_chain.get_missing_columns_for_epoch(epoch); + + if !missing_columns.is_empty() { + self.to_be_downloaded = epoch; + break; + } + + // This batch is being skipped, insert it into the skipped batches mapping. + self.skipped_batches.insert(epoch); + + if epoch == column_da_boundary { + return None; + } + } + + // Don't request batches before the column da boundary + if self.to_be_downloaded < column_da_boundary { + return None; + } + + // Don't request batches beyond the DA window + if self.last_batch_downloaded { + return None; + } + + // Only request batches up to the buffer size limit + // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync + // if the current processing window is contained in a long range of skip slots. + let in_buffer = |batch: &CustodyBackFillBatchInfo| { + matches!( + batch.state(), + BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) + ) + }; + if self + .batches + .iter() + .filter(|&(_epoch, batch)| in_buffer(batch)) + .count() + > BACKFILL_BATCH_BUFFER_SIZE as usize + { + return None; + } + + let batch_id = self.to_be_downloaded; + + match self.batches.entry(batch_id) { + Entry::Occupied(_) => { + // this batch doesn't need downloading, let this same function decide the next batch + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(CUSTODY_BACKFILL_EPOCHS_PER_BATCH); + self.include_next_batch() + } + Entry::Vacant(entry) => { + entry.insert(BatchInfo::new( + &batch_id, + CUSTODY_BACKFILL_EPOCHS_PER_BATCH, + ByRangeRequestType::Columns(missing_columns), + )); + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(CUSTODY_BACKFILL_EPOCHS_PER_BATCH); + Some(batch_id) + } + } + } + + /// Processes the batch with the given id. + /// The batch must exist and be ready for processing + fn process_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result { + // Check if we need to restart custody backfill sync due to a recent cgc change + if self.restart_if_required() { + return Ok(ProcessResult::Successful); + } + + if self.state() != CustodyBackFillState::Syncing || self.current_processing_batch.is_some() + { + return Ok(ProcessResult::Successful); + } + + let Some(batch) = self.batches.get_mut(&batch_id) else { + return self + .fail_sync(CustodyBackfillError::InvalidSyncState(format!( + "Trying to process a batch that does not exist: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + }; + + let (data_columns, _) = match batch.start_processing() { + Err(e) => { + return self + .fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0)) + .map(|_| ProcessResult::Successful); + } + Ok(v) => v, + }; + + self.current_processing_batch = Some(batch_id); + + if let Err(e) = network.beacon_processor().send_historic_data_columns( + CustodyBackfillBatchId { + epoch: batch_id, + run_id: self.run_id, + }, + data_columns, + ) { + crit!( + msg = "process_batch", + error = %e, + batch = ?self.processing_target, + "Failed to send data columns to processor." + ); + // This is unlikely to happen but it would stall syncing since the batch now has no + // data columns to continue, and the chain is expecting a processing result that won't + // arrive. To mitigate this, (fake) fail this processing so that the batch is + // re-downloaded. + self.on_batch_process_result( + network, + CustodyBackfillBatchId { + epoch: batch_id, + run_id: self.run_id, + }, + &CustodyBatchProcessResult::Error { peer_action: None }, + ) + } else { + Ok(ProcessResult::Successful) + } + } + + /// A data column has been received for a batch. + /// If the column correctly completes the batch it will be processed if possible. + /// If this returns an error, custody sync has failed and will be restarted once new peers + /// join the system. + /// The sync manager should update the global sync state on failure. + #[must_use = "A failure here indicates custody backfill sync has failed and the global sync state should be updated"] + pub fn on_data_column_response( + &mut self, + network: &mut SyncNetworkContext, + req_id: CustodyBackFillBatchRequestId, + peer_id: &PeerId, + resp: Result, RpcResponseError>, + ) -> Result { + if req_id.batch_id.run_id != self.run_id { + debug!(%req_id, "Ignoring custody backfill download response from different run_id"); + return Ok(ProcessResult::Successful); + } + + let batch_id = req_id.batch_id.epoch; + // check if we have this batch + let Some(batch) = self.batches.get_mut(&batch_id) else { + if !matches!(self.state(), CustodyBackFillState::Pending(_)) { + // A batch might get removed when custody sync advances, so this is non fatal. + debug!(epoch = %batch_id, "Received a column for unknown batch"); + } + return Ok(ProcessResult::Successful); + }; + + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed for other + // reasons. Check that this column belongs to the expected peer, and that the + // request_id matches + if !batch.is_expecting_request_id(&req_id.id) { + return Ok(ProcessResult::Successful); + } + + match resp { + Ok(data_columns) => { + let received = data_columns.len(); + + match batch.download_completed(data_columns, *peer_id) { + Ok(_) => { + let awaiting_batches = self.processing_target.saturating_sub(batch_id) + / CUSTODY_BACKFILL_EPOCHS_PER_BATCH; + debug!( + %req_id, + blocks = received, + %awaiting_batches, + "Completed batch received" + ); + + // pre-emptively request more columns from peers whilst we process current columns. + self.request_batches(network)?; + self.process_completed_batches(network) + } + Err(e) => { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + Ok(ProcessResult::Successful) + } + } + } + Err(err) => { + debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed"); + + // If there are any coupling errors, penalize the appropriate peers + if let RpcResponseError::BlockComponentCouplingError(coupling_error) = err + && let CouplingError::DataColumnPeerFailure { + error, + faulty_peers, + exceeded_retries: _, + } = coupling_error + { + for (column_index, faulty_peer) in faulty_peers { + debug!( + ?error, + ?column_index, + ?faulty_peer, + "Custody backfill sync penalizing peer" + ); + network.report_peer( + faulty_peer, + PeerAction::LowToleranceError, + "Peer failed to serve column", + ); + } + } + + match batch.download_failed(Some(*peer_id)) { + Err(e) => { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(CustodyBackfillError::BatchDownloadFailed(batch_id))?; + } + Ok(BatchOperationOutcome::Continue) => { + self.send_batch(network, batch_id)?; + } + } + Ok(ProcessResult::Successful) + } + } + } + + /// The beacon processor has completed processing a batch. This function handles the result + /// of the batch processor. + /// If an error is returned custody backfill sync has failed. + #[must_use = "A failure here indicates custody backfill sync has failed and the global sync state should be updated"] + pub fn on_batch_process_result( + &mut self, + network: &mut SyncNetworkContext, + custody_batch_id: CustodyBackfillBatchId, + result: &CustodyBatchProcessResult, + ) -> Result { + let batch_id = custody_batch_id.epoch; + if custody_batch_id.run_id != self.run_id { + debug!(batch = %custody_batch_id, "Ignoring custody backfill error from different run_id"); + return Ok(ProcessResult::Successful); + } + + // The first two cases are possible in regular sync, should not occur in custody backfill, but we + // keep this logic for handling potential processing race conditions. + // result + let batch = match &self.current_processing_batch { + Some(processing_id) if *processing_id != batch_id => { + debug!( + batch_epoch = %batch_id, + expected_batch_epoch = processing_id.as_u64(), + "Unexpected batch result" + ); + return Ok(ProcessResult::Successful); + } + None => { + debug!(%batch_id, "Chain was not expecting a batch result"); + return Ok(ProcessResult::Successful); + } + _ => { + // batch_id matches, continue + self.current_processing_batch = None; + + match self.batches.get_mut(&batch_id) { + Some(batch) => batch, + None => { + // This is an error. Fail the sync algorithm. + return self + .fail_sync(CustodyBackfillError::InvalidSyncState(format!( + "Current processing batch not found: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + } + } + } + }; + + let Some(peer) = batch.processing_peer() else { + self.fail_sync(CustodyBackfillError::BatchInvalidState( + batch_id, + String::from("Peer does not exist"), + ))?; + return Ok(ProcessResult::Successful); + }; + + debug!( + ?result, + batch_id = %custody_batch_id, + %peer, + client = %network.client_type(peer), + "Custody backfill batch processed" + ); + + match result { + CustodyBatchProcessResult::Success { + imported_columns, .. + } => { + if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + } + + debug!(imported_count=?imported_columns, "Succesfully imported historical data columns"); + + self.advance_custody_backfill_sync(batch_id); + + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return Err(CustodyBackfillError::InvalidSyncState( + "Can't calculate column data availability boundary".to_string(), + )); + }; + + if batch_id == self.processing_target { + // Advance processing target to the previous epoch + // If the current processing target is above the column DA boundary + if self.processing_target > column_da_boundary { + self.processing_target = self + .processing_target + .saturating_sub(CUSTODY_BACKFILL_EPOCHS_PER_BATCH); + } + } + + // check if custody sync has completed syncing up to the DA window + if self.check_completed() { + info!( + validated_epochs = ?self.validated_batches, + run_id = self.run_id, + "Custody backfill sync completed" + ); + self.batches.clear(); + self.restart_failed_sync = false; + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + self.last_batch_downloaded = false; + self.current_processing_batch = None; + self.validated_batches = 0; + self.skipped_batches.clear(); + self.set_state(CustodyBackFillState::Completed); + self.beacon_chain.update_data_column_custody_info(None); + Ok(ProcessResult::SyncCompleted) + } else { + // custody sync is not completed + // attempt to request more batches + self.request_batches(network)?; + // attempt to process more batches + self.process_completed_batches(network) + } + } + CustodyBatchProcessResult::Error { peer_action } => { + match peer_action { + // Faulty failure + Some(peer_action) => { + match batch.processing_completed(BatchProcessingResult::FaultyFailure) { + Err(e) => { + // Batch was in the wrong state + self.fail_sync(CustodyBackfillError::BatchInvalidState( + batch_id, e.0, + )) + .map(|_| ProcessResult::Successful) + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + warn!( + score_adjustment = ?peer_action, + batch_epoch = %batch_id, + "Custody backfill batch failed to download. Penalizing peers" + ); + self.fail_sync(CustodyBackfillError::BatchProcessingFailed( + batch_id, + )) + .map(|_| ProcessResult::Successful) + } + + Ok(BatchOperationOutcome::Continue) => { + self.advance_custody_backfill_sync(batch_id); + // Handle this invalid batch, that is within the re-process retries limit. + self.handle_invalid_batch(network, batch_id) + .map(|_| ProcessResult::Successful) + } + } + } + // Non faulty failure + None => { + if let Err(e) = + batch.processing_completed(BatchProcessingResult::NonFaultyFailure) + { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + } + self.send_batch(network, batch_id)?; + Ok(ProcessResult::Successful) + } + } + } + } + } + + /// Processes the next ready batch. + fn process_completed_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + // Only process batches if custody backfill is syncing and only process one batch at a time + if self.state() != CustodyBackFillState::Syncing || self.current_processing_batch.is_some() + { + return Ok(ProcessResult::Successful); + } + + // Don't try to process batches before the Fulu fork epoch since data columns don't exist + if let Some(fulu_fork_epoch) = self.beacon_chain.spec.fulu_fork_epoch + && self.processing_target < fulu_fork_epoch + { + return Ok(ProcessResult::Successful); + } + + // Check if we need to restart custody backfill sync due to a cgc change. + if self.restart_if_required() { + return Ok(ProcessResult::Successful); + } + + while self.skipped_batches.contains(&self.processing_target) { + self.skipped_batches.remove(&self.processing_target); + // Update data column custody info with the skipped batch + if let Err(e) = self + .beacon_chain + .safely_backfill_data_column_custody_info(self.processing_target) + { + // I can't see a scenario where this could happen, but if we don't + // handle this edge case custody backfill sync could be stuck indefinitely. + error!( + error=?e, + "Unable to update data column custody info, restarting sync" + ); + self.restart_sync(); + }; + self.processing_target -= BACKFILL_EPOCHS_PER_BATCH; + } + + // Find the id of the batch we are going to process. + if let Some(batch) = self.batches.get(&self.processing_target) { + let state = batch.state(); + match state { + BatchState::AwaitingProcessing(..) => { + return self.process_batch(network, self.processing_target); + } + BatchState::Downloading(..) => { + // Batch is not ready, nothing to process + } + // Batches can be in `AwaitingDownload` state if there weren't good data column subnet + // peers to send the request to. + BatchState::AwaitingDownload => return Ok(ProcessResult::Successful), + BatchState::AwaitingValidation(..) => { + // The batch is validated + } + BatchState::Poisoned => unreachable!("Poisoned batch"), + BatchState::Failed | BatchState::Processing(_) => { + // these are all inconsistent states: + // - Failed -> non recoverable batch. Columns should have been removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - Processing -> `self.current_processing_batch` is None + self.fail_sync(CustodyBackfillError::InvalidSyncState(String::from( + "Invalid expected batch state", + )))?; + return Ok(ProcessResult::Successful); + } + } + } else { + self.fail_sync(CustodyBackfillError::InvalidSyncState(format!( + "Batch not found for current processing target {}", + self.processing_target + )))?; + return Ok(ProcessResult::Successful); + } + Ok(ProcessResult::Successful) + } + + /// Removes any batches previous to the given `validating_epoch` and advance custody backfill sync + /// to `validating_epoch`. + /// + /// The `validating_epoch` must align with batch boundaries. + fn advance_custody_backfill_sync(&mut self, validating_epoch: Epoch) { + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return; + }; + // make sure this epoch produces an advancement, unless its at the column DA boundary + if validating_epoch >= self.current_start && validating_epoch > column_da_boundary { + return; + } + + // We can now validate higher batches than the current batch. Here we remove all + // batches that are higher than the current batch. We add on an extra + // `BACKFILL_EPOCHS_PER_BATCH` as `split_off` is inclusive. + let removed_batches = self + .batches + .split_off(&(validating_epoch + CUSTODY_BACKFILL_EPOCHS_PER_BATCH)); + + for (id, batch) in removed_batches.into_iter() { + self.validated_batches = self.validated_batches.saturating_add(1); + match batch.state() { + BatchState::Downloading(..) | BatchState::AwaitingValidation(..) => {} + BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { + crit!("Batch indicates inconsistent data columns while advancing custody sync") + } + BatchState::AwaitingProcessing(..) => {} + BatchState::Processing(_) => { + debug!(batch = %id, %batch, "Advancing custody sync while processing a batch"); + if let Some(processing_id) = self.current_processing_batch + && id >= processing_id + { + self.current_processing_batch = None; + } + } + } + } + + self.processing_target = self.processing_target.min(validating_epoch); + self.current_start = self.current_start.min(validating_epoch); + self.to_be_downloaded = self.to_be_downloaded.min(validating_epoch); + + if self.batches.contains_key(&self.to_be_downloaded) { + // if custody backfill sync is advanced by Range beyond the previous `self.to_be_downloaded`, we + // won't have this batch, so we need to request it. + self.to_be_downloaded -= CUSTODY_BACKFILL_EPOCHS_PER_BATCH; + } + debug!(?validating_epoch, processing_target = ?self.processing_target, "Custody backfill advanced"); + } + + /// An invalid batch has been received that could not be processed, but that can be retried. + /// + /// These events occur when a peer has successfully responded with columns, but the columns + /// received are incorrect or invalid. This indicates the peer has not performed as + /// intended and can result in down voting a peer. + fn handle_invalid_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), CustodyBackfillError> { + // The current batch could not be processed, indicating either the current or previous + // batches are invalid. + + // The previous batch could be incomplete due to the columns being too large to fit in + // a single RPC request or there could be consecutive empty batches which are not supposed + // to be there + + // The current (sub-optimal) strategy is to simply re-request all batches that could + // potentially be faulty. If a batch returns a different result than the original and + // results in successful processing, we downvote the original peer that sent us the batch. + + // this is our robust `processing_target`. All previous batches must be awaiting + // validation + let mut redownload_queue = Vec::new(); + + for (id, _) in self.batches.iter_mut().filter(|&(&id, _)| id > batch_id) { + redownload_queue.push(*id); + } + + // no batch maxed out it process attempts, so now the chain's volatile progress must be + // reset + self.processing_target = self.current_start; + + for id in redownload_queue { + self.send_batch(network, id)?; + } + // finally, re-request the failed batch. + self.send_batch(network, batch_id) + } + + /// Checks with the beacon chain if custody sync has completed. + fn check_completed(&mut self) -> bool { + if self.would_complete(self.current_start) { + // Check that the data column custody info `earliest_available_slot` + // is in an epoch that is less than or equal to the current DA boundary + let Some(earliest_data_column_epoch) = + self.beacon_chain.earliest_custodied_data_column_epoch() + else { + return false; + }; + + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return false; + }; + + return earliest_data_column_epoch <= column_da_boundary; + } + false + } + + /// Checks if custody backfill would complete by syncing to `start_epoch`. + fn would_complete(&self, start_epoch: Epoch) -> bool { + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return false; + }; + start_epoch <= column_da_boundary + } + + /// Requests the batch assigned to the given id from a given peer. + fn send_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), CustodyBackfillError> { + let span = info_span!(SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST); + let _enter = span.enter(); + + if let Some(batch) = self.batches.get_mut(&batch_id) { + let synced_peers = self + .network_globals + .peers + .read() + .synced_peers_for_epoch(batch_id) + .cloned() + .collect::>(); + + let request = batch.to_data_columns_by_range_request().map_err(|_| { + CustodyBackfillError::InvalidSyncState( + "Can't convert to data column by range request".to_string(), + ) + })?; + let failed_peers = batch.failed_peers(); + + match network.custody_backfill_data_columns_batch_request( + request, + CustodyBackfillBatchId { + epoch: batch_id, + run_id: self.run_id, + }, + &synced_peers, + &failed_peers, + ) { + Ok(request_id) => { + // inform the batch about the new request + if let Err(e) = batch.start_downloading(request_id.id) { + return self + .fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0)); + } + debug!(epoch = %batch_id, %batch, "Requesting batch"); + + return Ok(()); + } + Err(e) => match e { + crate::sync::network_context::RpcRequestSendError::NoPeer(no_peer) => { + // If we are here we have no more synced peers + debug!( + "reason" = format!("insufficient_synced_peers({no_peer:?})"), + "Custody sync paused" + ); + self.pause("Insufficient peers".to_string()); + return Err(CustodyBackfillError::Paused); + } + crate::sync::network_context::RpcRequestSendError::InternalError(e) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(%batch_id, error = ?e, %batch,"Could not send batch request"); + // register the failed download and check if the batch can be retried + if let Err(e) = batch.start_downloading(1) { + return self + .fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0)); + } + + match batch.download_failed(None) { + Err(e) => self.fail_sync(CustodyBackfillError::BatchInvalidState( + batch_id, e.0, + ))?, + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(CustodyBackfillError::BatchDownloadFailed(batch_id))? + } + Ok(BatchOperationOutcome::Continue) => { + return self.send_batch(network, batch_id); + } + } + } + }, + } + } + + Ok(()) + } + + /// The syncing process has failed. + /// + /// This resets past variables, to allow for a fresh start when resuming. + fn fail_sync(&mut self, error: CustodyBackfillError) -> Result<(), CustodyBackfillError> { + // Some errors shouldn't cause failure. + if matches!(error, CustodyBackfillError::Paused) { + return Ok(()); + } + + // Set the state + self.pause("Sync has failed".to_string()); + // Remove all batches and active requests. + self.batches.clear(); + self.restart_failed_sync = false; + + // Reset all downloading and processing targets + // NOTE: Lets keep validated_batches for posterity + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + self.last_batch_downloaded = false; + self.current_processing_batch = None; + self.restart_sync(); + + Err(error) + } + + pub fn state(&self) -> CustodyBackFillState { + self.network_globals.custody_sync_state.read().clone() + } + + /// Updates the global network state indicating the current state of a backfill sync. + pub fn set_state(&self, state: CustodyBackFillState) { + *self.network_globals.custody_sync_state.write() = state; + } + + /// A fully synced peer has joined us. + /// If we are in a failed state, update a local variable to indicate we are able to restart + /// the failed sync on the next attempt. + pub fn fully_synced_peer_joined(&mut self) { + if matches!(self.state(), CustodyBackFillState::Pending(_)) { + self.restart_failed_sync = true; + } + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index d7ba028054..338f21ce98 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -46,7 +46,8 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; -use crate::sync::network_context::PeerGroup; +use crate::sync::custody_backfill_sync::CustodyBackFillSync; +use crate::sync::network_context::{PeerGroup, RpcResponseResult}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{ @@ -56,14 +57,16 @@ use futures::StreamExt; use lighthouse_network::SyncInfo; use lighthouse_network::rpc::RPCError; use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyRequester, - DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, - SingleLookupReqId, SyncRequestId, + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, + DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; use lru_cache::LRUTimeCache; +use slot_clock::SlotClock; use std::ops::Sub; use std::sync::Arc; use std::time::Duration; @@ -158,6 +161,12 @@ pub enum SyncMessage { result: BatchProcessResult, }, + /// A custody batch has been processed by the processor thread. + CustodyBatchProcessed { + batch_id: CustodyBackfillBatchId, + result: CustodyBatchProcessResult, + }, + /// Block processed BlockComponentProcessed { process_type: BlockProcessType, @@ -209,6 +218,19 @@ pub enum BatchProcessResult { NonFaultyFailure, } +/// The result of processing multiple data columns. +#[derive(Debug)] +pub enum CustodyBatchProcessResult { + /// The custody batch was completed successfully. It carries whether the sent batch contained data columns. + Success { + #[allow(dead_code)] + sent_columns: usize, + imported_columns: usize, + }, + /// The custody batch processing failed. + Error { peer_action: Option }, +} + /// The primary object for handling and driving all the current syncing logic. It maintains the /// current state of the syncing process, the number of useful peers, downloaded blocks and /// controls the logic behind both the long-range (batch) sync and the on-going potential parent @@ -229,6 +251,9 @@ pub struct SyncManager { /// Backfill syncing. backfill_sync: BackFillSync, + /// Custody syncing. + custody_backfill_sync: CustodyBackFillSync, + block_lookups: BlockLookups, /// debounce duplicated `UnknownBlockHashFromAttestation` for the same root peer tuple. A peer /// may forward us thousands of a attestations, each one triggering an individual event. Only @@ -288,7 +313,8 @@ impl SyncManager { fork_context.clone(), ), range_sync: RangeSync::new(beacon_chain.clone()), - backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals), + backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals.clone()), + custody_backfill_sync: CustodyBackFillSync::new(beacon_chain.clone(), network_globals), block_lookups: BlockLookups::new(), notified_unknown_roots: LRUTimeCache::new(Duration::from_secs( NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS, @@ -549,6 +575,7 @@ impl SyncManager { // inform the backfill sync that a new synced peer has joined us. if new_state.is_synced() { self.backfill_sync.fully_synced_peer_joined(); + self.custody_backfill_sync.fully_synced_peer_joined(); } } is_connected @@ -558,17 +585,18 @@ impl SyncManager { } } - /// Updates the global sync state, optionally instigating or pausing a backfill sync as well as + /// Updates the global sync state, optionally instigating or pausing a backfill or custody sync as well as /// logging any changes. /// /// The logic for which sync should be running is as follows: - /// - If there is a range-sync running (or required) pause any backfill and let range-sync + /// - If there is a range-sync running (or required) pause any backfill/custody sync and let range-sync /// complete. /// - If there is no current range sync, check for any requirement to backfill and either /// start/resume a backfill sync if required. The global state will be BackFillSync if a /// backfill sync is running. /// - If there is no range sync and no required backfill and we have synced up to the currently /// known peers, we consider ourselves synced. + /// - If there is no range sync and no required backfill we check if we need to execute a custody sync. fn update_sync_state(&mut self) { let new_state: SyncState = match self.range_sync.state() { Err(e) => { @@ -624,15 +652,51 @@ impl SyncManager { error!(error = ?e, "Backfill sync failed to start"); } } + + // If backfill is complete, check if we have a pending custody backfill to complete + let anchor_info = self.chain.store.get_anchor_info(); + if anchor_info.block_backfill_complete(self.chain.genesis_backfill_slot) { + match self.custody_backfill_sync.start(&mut self.network) { + Ok(SyncStart::Syncing { + completed, + remaining, + }) => { + sync_state = SyncState::CustodyBackFillSyncing { + completed, + remaining, + }; + } + Ok(SyncStart::NotSyncing) => {} // Ignore updating the state if custody sync state didn't start. + Err(e) => { + use crate::sync::custody_backfill_sync::CustodyBackfillError; + + match &e { + CustodyBackfillError::BatchDownloadFailed(_) + | CustodyBackfillError::BatchProcessingFailed(_) => { + debug!(error=?e, "Custody backfill batch processing or downloading failed"); + } + CustodyBackfillError::BatchInvalidState(_, reason) => { + error!(error=?e, reason, "Custody backfill sync failed due to invalid batch state") + } + CustodyBackfillError::InvalidSyncState(reason) => { + error!(error=?e, reason, "Custody backfill sync failed due to invalid sync state") + } + CustodyBackfillError::Paused => {} + } + } + } + } } // Return the sync state if backfilling is not required. sync_state } Some((RangeSyncType::Finalized, start_slot, target_slot)) => { - // If there is a backfill sync in progress pause it. + // Range sync is in progress. If there is a backfill or custody sync in progress pause it. #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); + self.custody_backfill_sync + .pause("Range sync in progress".to_string()); SyncState::SyncingFinalized { start_slot, @@ -640,9 +704,12 @@ impl SyncManager { } } Some((RangeSyncType::Head, start_slot, target_slot)) => { - // If there is a backfill sync in progress pause it. + // Range sync is in progress. If there is a backfill or custody backfill sync + // in progress pause it. #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); + self.custody_backfill_sync + .pause("Range sync in progress".to_string()); SyncState::SyncingHead { start_slot, @@ -662,7 +729,9 @@ impl SyncManager { if new_state.is_synced() && !matches!( old_state, - SyncState::Synced | SyncState::BackFillSyncing { .. } + SyncState::Synced + | SyncState::BackFillSyncing { .. } + | SyncState::CustodyBackFillSyncing { .. } ) { self.network.subscribe_core_topics(); @@ -693,6 +762,11 @@ impl SyncManager { let mut register_metrics_interval = tokio::time::interval(Duration::from_secs(5)); + // Trigger a sync state update every epoch. This helps check if we need to trigger a custody backfill sync. + let epoch_duration = + self.chain.slot_clock.slot_duration().as_secs() * T::EthSpec::slots_per_epoch(); + let mut epoch_interval = tokio::time::interval(Duration::from_secs(epoch_duration)); + // process any inbound messages loop { tokio::select! { @@ -711,6 +785,9 @@ impl SyncManager { _ = register_metrics_interval.tick() => { self.network.register_metrics(); } + _ = epoch_interval.tick() => { + self.update_sync_state(); + } } } } @@ -865,6 +942,21 @@ impl SyncManager { } } }, + SyncMessage::CustodyBatchProcessed { result, batch_id } => { + match self.custody_backfill_sync.on_batch_process_result( + &mut self.network, + batch_id, + &result, + ) { + Ok(ProcessResult::Successful) => {} + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Err(error) => { + error!(error = ?error, "Custody sync failed"); + // Update the global status + self.update_sync_state(); + } + } + } } } @@ -1081,11 +1173,13 @@ impl SyncManager { RpcEvent::from_chunk(data_column, seen_timestamp), ); } - SyncRequestId::DataColumnsByRange(id) => self.on_data_columns_by_range_response( - id, - peer_id, - RpcEvent::from_chunk(data_column, seen_timestamp), - ), + SyncRequestId::DataColumnsByRange(req_id) => { + self.on_data_columns_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(data_column, seen_timestamp), + ); + } _ => { crit!(%peer_id, "bad request id for data_column"); } @@ -1173,11 +1267,22 @@ impl SyncManager { .network .on_data_columns_by_range_response(id, peer_id, data_column) { - self.on_range_components_response( - id.parent_request_id, - peer_id, - RangeBlockComponent::CustodyColumns(id, resp), - ); + match id.parent_request_id { + DataColumnsByRangeRequester::ComponentsByRange(components_by_range_req_id) => { + self.on_range_components_response( + components_by_range_req_id, + peer_id, + RangeBlockComponent::CustodyColumns(id, resp), + ); + } + DataColumnsByRangeRequester::CustodyBackfillSync(custody_backfill_req_id) => self + .on_custody_backfill_columns_response( + custody_backfill_req_id, + id, + peer_id, + resp, + ), + } } } @@ -1267,6 +1372,36 @@ impl SyncManager { } } } + + /// Handles receiving a response for a custody range sync request that has columns. + fn on_custody_backfill_columns_response( + &mut self, + custody_sync_request_id: CustodyBackFillBatchRequestId, + req_id: DataColumnsByRangeRequestId, + peer_id: PeerId, + data_columns: RpcResponseResult>>>, + ) { + if let Some(resp) = self.network.custody_backfill_data_columns_response( + custody_sync_request_id, + req_id, + data_columns, + ) { + match self.custody_backfill_sync.on_data_column_response( + &mut self.network, + custody_sync_request_id, + &peer_id, + resp, + ) { + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Ok(ProcessResult::Successful) => {} + Err(_e) => { + // The custody sync has failed, errors are reported + // within. + self.update_sync_state(); + } + } + } + } } impl From> for BlockProcessingResult { diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 4dab2e17d3..054bab654c 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -2,14 +2,17 @@ //! //! Stores the various syncing methods for the beacon chain. mod backfill_sync; +mod batch; mod block_lookups; mod block_sidecar_coupling; +mod custody_backfill_sync; pub mod manager; mod network_context; mod peer_sync_info; +mod range_data_column_batch_request; mod range_sync; #[cfg(test)] mod tests; pub use manager::{BatchProcessResult, SyncMessage}; -pub use range_sync::{BatchOperationOutcome, ChainId}; +pub use range_sync::ChainId; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1d119cb2de..2e0c56db23 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -6,16 +6,17 @@ pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlock use super::SyncMessage; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; -use super::range_sync::ByRangeRequestType; use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; #[cfg(test)] use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; +use crate::sync::batch::ByRangeRequestType; use crate::sync::block_lookups::SingleLookupId; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; +use crate::sync::range_data_column_batch_request::RangeDataColumnBatchRequest; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; @@ -25,7 +26,8 @@ use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, Req pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, + CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; @@ -211,7 +213,6 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, - /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -219,6 +220,10 @@ pub struct SyncNetworkContext { components_by_range_requests: FnvHashMap>, + /// A batch of data columns by range request for custody sync + custody_backfill_data_column_batch_requests: + FnvHashMap>, + /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. execution_engine_state: EngineState, @@ -295,6 +300,7 @@ impl SyncNetworkContext { data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), + custody_backfill_data_column_batch_requests: FnvHashMap::default(), network_beacon_processor, chain, fork_context, @@ -324,6 +330,7 @@ impl SyncNetworkContext { custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, + custody_backfill_data_column_batch_requests: _, execution_engine_state: _, network_beacon_processor: _, chain: _, @@ -354,7 +361,6 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); - blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) @@ -421,6 +427,7 @@ impl SyncNetworkContext { custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, + custody_backfill_data_column_batch_requests: _, execution_engine_state: _, network_beacon_processor: _, chain: _, @@ -503,7 +510,7 @@ impl SyncNetworkContext { count: *request.count(), columns, }, - id, + DataColumnsByRangeRequester::ComponentsByRange(id), new_range_request_span!( self, "outgoing_columns_by_range_retry", @@ -638,7 +645,7 @@ impl SyncNetworkContext { count: *request.count(), columns, }, - id, + DataColumnsByRangeRequester::ComponentsByRange(id), new_range_request_span!( self, "outgoing_columns_by_range", @@ -1238,7 +1245,7 @@ impl SyncNetworkContext { &mut self, peer_id: PeerId, request: DataColumnsByRangeRequest, - parent_request_id: ComponentsByRangeRequestId, + parent_request_id: DataColumnsByRangeRequester, request_span: Span, ) -> Result<(DataColumnsByRangeRequestId, Vec), RpcRequestSendError> { let requested_columns = request.columns.clone(); @@ -1679,6 +1686,111 @@ impl SyncNetworkContext { }) } + /// data column by range requests sent by the custody sync algorithm + pub fn custody_backfill_data_columns_batch_request( + &mut self, + request: DataColumnsByRangeRequest, + batch_id: CustodyBackfillBatchId, + peers: &HashSet, + peers_to_deprioritize: &HashSet, + ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + // Attempt to find all required custody peers before sending any request or creating an ID + let columns_by_range_peers_to_request = { + let column_indexes = self + .chain + .sampling_columns_for_epoch(batch_id.epoch) + .iter() + .cloned() + .collect(); + + self.select_columns_by_range_peers_to_request( + &column_indexes, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + )? + }; + + // Create the overall `custody_by_range` request id + let id = CustodyBackFillBatchRequestId { + id: self.next_id(), + batch_id, + }; + + let result = columns_by_range_peers_to_request + .iter() + .filter_map(|(peer_id, _)| { + self.send_data_columns_by_range_request( + *peer_id, + request.clone(), + DataColumnsByRangeRequester::CustodyBackfillSync(id), + Span::none(), + ) + .ok() + }) + .collect::>(); + + let range_data_column_batch_request = + RangeDataColumnBatchRequest::new(result, self.chain.clone(), batch_id.epoch); + + self.custody_backfill_data_column_batch_requests + .insert(id, range_data_column_batch_request); + + Ok(id) + } + + /// Received a data columns by range response from a custody sync request which batches them. + pub fn custody_backfill_data_columns_response( + &mut self, + // Identifies the custody backfill request for all data columns on this epoch + custody_sync_request_id: CustodyBackFillBatchRequestId, + // Identifies a specific data_columns_by_range request for *some* columns in this epoch. We + // pass them separately as DataColumnsByRangeRequestId parent is an enum and would require + // matching again. + req_id: DataColumnsByRangeRequestId, + data_columns: RpcResponseResult>, + ) -> Option, RpcResponseError>> { + let Entry::Occupied(mut entry) = self + .custody_backfill_data_column_batch_requests + .entry(custody_sync_request_id) + else { + metrics::inc_counter_vec( + &metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, + &["range_data_columns"], + ); + return None; + }; + + if let Err(e) = { + let request = entry.get_mut(); + data_columns.and_then(|(data_columns, _)| { + request + .add_custody_columns(req_id, data_columns.clone()) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError( + e, + )) + }) + }) + } { + entry.remove(); + return Some(Err(e)); + } + + if let Some(data_column_result) = entry.get_mut().responses() { + if data_column_result.is_ok() { + // remove the entry only if it coupled successfully with + // no errors + entry.remove(); + } + // If the request is finished, dequeue everything + Some(data_column_result.map_err(RpcResponseError::BlockComponentCouplingError)) + } else { + None + } + } + pub(crate) fn register_metrics(&self) { for (id, count) in [ ("blocks_by_root", self.blocks_by_root_requests.len()), diff --git a/beacon_node/network/src/sync/range_data_column_batch_request.rs b/beacon_node/network/src/sync/range_data_column_batch_request.rs new file mode 100644 index 0000000000..542d99d97c --- /dev/null +++ b/beacon_node/network/src/sync/range_data_column_batch_request.rs @@ -0,0 +1,297 @@ +use std::collections::{HashMap, HashSet}; + +use crate::sync::block_sidecar_coupling::{ByRangeRequest, CouplingError}; +use crate::sync::network_context::MAX_COLUMN_RETRIES; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use itertools::Itertools; +use lighthouse_network::PeerId; +use lighthouse_network::service::api_types::DataColumnsByRangeRequestId; +use std::sync::Arc; +use types::{ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Slot}; + +pub struct RangeDataColumnBatchRequest { + requests: HashMap< + DataColumnsByRangeRequestId, + ByRangeRequest>, + >, + /// The column indices corresponding to the request + column_peers: HashMap>, + expected_custody_columns: HashSet, + attempt: usize, + beacon_chain: Arc>, + epoch: Epoch, +} + +impl RangeDataColumnBatchRequest { + pub fn new( + by_range_requests: Vec<(DataColumnsByRangeRequestId, Vec)>, + beacon_chain: Arc>, + epoch: Epoch, + ) -> Self { + let requests = by_range_requests + .clone() + .into_iter() + .map(|(req, _)| (req, ByRangeRequest::Active(req))) + .collect::>(); + + let column_peers = by_range_requests.clone().into_iter().collect(); + + let expected_custody_columns = by_range_requests + .into_iter() + .flat_map(|(_, column_indices)| column_indices) + .collect(); + + Self { + requests, + column_peers, + expected_custody_columns, + beacon_chain, + epoch, + attempt: 0, + } + } + + pub fn add_custody_columns( + &mut self, + req_id: DataColumnsByRangeRequestId, + columns: Vec>>, + ) -> Result<(), String> { + let req = self + .requests + .get_mut(&req_id) + .ok_or(format!("unknown data columns by range req_id {req_id}"))?; + req.finish(req_id, columns) + } + + pub fn responses( + &mut self, + ) -> Option, CouplingError>> { + let mut received_columns_for_slot: HashMap> = + HashMap::new(); + let mut column_to_peer_id: HashMap = HashMap::new(); + + for column in self + .requests + .values() + .filter_map(|req| req.to_finished()) + .flatten() + { + received_columns_for_slot + .entry(column.slot()) + .or_default() + .push(column.clone()); + } + + // Note: this assumes that only 1 peer is responsible for a column + // with a batch. + for (id, columns) in self.column_peers.iter() { + for column in columns { + column_to_peer_id.insert(*column, id.peer); + } + } + + // An "attempt" is complete here after we have received a response for all the + // requests we made. i.e. `req.to_finished()` returns Some for all requests. + self.attempt += 1; + + let resp = self.responses_with_custody_columns( + received_columns_for_slot, + column_to_peer_id, + &self.expected_custody_columns, + self.attempt, + ); + + if let Err(CouplingError::DataColumnPeerFailure { + error: _, + faulty_peers, + exceeded_retries: _, + }) = &resp + { + for (_, peer) in faulty_peers.iter() { + // find the req id associated with the peer and + // delete it from the entries as we are going to make + // a separate attempt for those components. + self.requests.retain(|&k, _| k.peer != *peer); + } + } + Some(resp) + } + + fn responses_with_custody_columns( + &self, + mut received_columns_for_slot: HashMap>, + column_to_peer: HashMap, + expected_custody_columns: &HashSet, + attempt: usize, + ) -> Result, CouplingError> { + let mut naughty_peers = vec![]; + let mut result: DataColumnSidecarList = vec![]; + + let forward_blocks_iter = self + .beacon_chain + .forwards_iter_block_roots_until( + self.epoch.start_slot(T::EthSpec::slots_per_epoch()), + self.epoch.end_slot(T::EthSpec::slots_per_epoch()), + ) + .map_err(|_| { + CouplingError::InternalError("Failed to fetch block root iterator".to_string()) + })?; + + for block_iter_result in forward_blocks_iter { + let (block_root, slot) = block_iter_result.map_err(|_| { + CouplingError::InternalError("Failed to iterate block roots".to_string()) + })?; + + let Some(block) = self + .beacon_chain + .get_blinded_block(&block_root) + .ok() + .flatten() + else { + // The block root we are fetching is from the forwards block root iterator. This doesn't seem like a possible scenario. + return Err(CouplingError::InternalError( + "Block root from forwards block iterator not found in db".to_string(), + )); + }; + + let Some(columns) = received_columns_for_slot.remove(&slot) else { + // If at least one blob is expected for this slot but none have been served, penalize all peers + // The slot check ensures we arent checking a skipped slot. + if block.num_expected_blobs() != 0 && block.slot() == slot { + for column in expected_custody_columns { + if let Some(naughty_peer) = column_to_peer.get(column) { + naughty_peers.push((*column, *naughty_peer)); + } + } + } + continue; + }; + + // This is a skipped slot, skip to the next slot after we verify that peers + // didn't serve us columns for a skipped slot + if block.slot() != slot { + // If we received columns for a skipped slot, punish the peer + if !columns.is_empty() { + for column in expected_custody_columns { + if let Some(naughty_peer) = column_to_peer.get(column) { + naughty_peers.push((*column, *naughty_peer)); + } + } + } + + continue; + } + + let column_block_roots = columns + .iter() + .map(|column| column.block_root()) + .unique() + .collect::>(); + + let column_block_signatures = columns + .iter() + .map(|column| column.signed_block_header.signature.clone()) + .unique() + .collect::>(); + + let column_block_root = match column_block_roots.as_slice() { + // We expect a single unique block root + [column_block_root] => *column_block_root, + // If there are no block roots, penalize all peers + [] => { + for column in &columns { + if let Some(naughty_peer) = column_to_peer.get(&column.index) { + naughty_peers.push((column.index, *naughty_peer)); + } + } + continue; + } + // If theres more than one unique block root penalize the peers serving the bad block roots. + column_block_roots => { + for column in columns { + if column_block_roots.contains(&column.block_root()) + && block_root != column.block_root() + && let Some(naughty_peer) = column_to_peer.get(&column.index) + { + naughty_peers.push((column.index, *naughty_peer)); + } + } + continue; + } + }; + + let column_block_signature = match column_block_signatures.as_slice() { + // We expect a single unique block signature + [block_signature] => block_signature, + // If there are no block signatures, penalize all peers + [] => { + for column in &columns { + if let Some(naughty_peer) = column_to_peer.get(&column.index) { + naughty_peers.push((column.index, *naughty_peer)); + } + } + continue; + } + // If theres more than one unique block signature, penalize the peers serving the + // invalid block signatures. + column_block_signatures => { + for column in columns { + if column_block_signatures.contains(&column.signed_block_header.signature) + && block.signature() != &column.signed_block_header.signature + && let Some(naughty_peer) = column_to_peer.get(&column.index) + { + naughty_peers.push((column.index, *naughty_peer)); + } + } + continue; + } + }; + + // if the block root doesn't match the columns block root, penalize the peers + if block_root != column_block_root { + for column in &columns { + if let Some(naughty_peer) = column_to_peer.get(&column.index) { + naughty_peers.push((column.index, *naughty_peer)); + } + } + } + + // If the block signature doesn't match the columns block signature, penalize the peers + if block.signature() != column_block_signature { + for column in &columns { + if let Some(naughty_peer) = column_to_peer.get(&column.index) { + naughty_peers.push((column.index, *naughty_peer)); + } + } + } + + let received_columns = columns.iter().map(|c| c.index).collect::>(); + + let missing_columns = received_columns + .difference(expected_custody_columns) + .collect::>(); + + // blobs are expected for this slot but there is at least one missing columns + // penalize the peers responsible for those columns. + if block.num_expected_blobs() != 0 && !missing_columns.is_empty() { + for column in missing_columns { + if let Some(naughty_peer) = column_to_peer.get(column) { + naughty_peers.push((*column, *naughty_peer)); + }; + } + } + + result.extend(columns); + } + + if !naughty_peers.is_empty() { + return Err(CouplingError::DataColumnPeerFailure { + error: "Bad or missing columns for some slots".to_string(), + faulty_peers: naughty_peers, + exceeded_retries: attempt >= MAX_COLUMN_RETRIES, + }); + } + + Ok(result) + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index ab5b8bee5e..014d728ffe 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,10 +1,13 @@ use super::RangeSyncType; -use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::batch::BatchId; +use crate::sync::batch::{ + BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, +}; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; -use crate::sync::{BatchOperationOutcome, BatchProcessResult, network_context::SyncNetworkContext}; +use crate::sync::{BatchProcessResult, network_context::SyncNetworkContext}; use beacon_chain::BeaconChainTypes; use beacon_chain::block_verification_types::RpcBlock; use lighthouse_network::service::api_types::Id; @@ -12,6 +15,8 @@ use lighthouse_network::{PeerAction, PeerId}; use lighthouse_tracing::SPAN_SYNCING_CHAIN; use logging::crit; use std::collections::{BTreeMap, HashSet, btree_map::Entry}; +use std::hash::{Hash, Hasher}; +use std::marker::PhantomData; use strum::IntoStaticStr; use tracing::{Span, debug, instrument, warn}; use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot}; @@ -35,6 +40,35 @@ const BATCH_BUFFER_SIZE: u8 = 5; /// and continued is now in an inconsistent state. pub type ProcessingResult = Result; +type RpcBlocks = Vec>; +type RangeSyncBatchInfo = BatchInfo, RpcBlocks>; +type RangeSyncBatches = BTreeMap>; + +/// The number of times to retry a batch before it is considered failed. +const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; + +/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed +/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. +const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; + +pub struct RangeSyncBatchConfig { + marker: PhantomData, +} + +impl BatchConfig for RangeSyncBatchConfig { + fn max_batch_download_attempts() -> u8 { + MAX_BATCH_DOWNLOAD_ATTEMPTS + } + fn max_batch_processing_attempts() -> u8 { + MAX_BATCH_PROCESSING_ATTEMPTS + } + fn batch_attempt_hash(data: &D) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + data.hash(&mut hasher); + hasher.finish() + } +} + /// Reasons for removing a chain #[derive(Debug)] #[allow(dead_code)] @@ -55,7 +89,6 @@ pub struct KeepChain; /// A chain identifier pub type ChainId = Id; -pub type BatchId = Epoch; #[derive(Debug, Copy, Clone, IntoStaticStr)] pub enum SyncingChainType { @@ -85,7 +118,7 @@ pub struct SyncingChain { pub target_head_root: Hash256, /// Sorted map of batches undergoing some kind of processing. - batches: BTreeMap>, + batches: RangeSyncBatches, /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain /// and thus available to download this chain from, as well as the batches we are currently @@ -249,7 +282,7 @@ impl SyncingChain { // request_id matches // TODO(das): removed peer_id matching as the node may request a different peer for data // columns. - if !batch.is_expecting_block(&request_id) { + if !batch.is_expecting_request_id(&request_id) { return Ok(KeepChain); } batch @@ -260,7 +293,8 @@ impl SyncingChain { // Remove the request from the peer's active batches // TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258 - let received = batch.download_completed(blocks, *peer_id)?; + let received = blocks.len(); + batch.download_completed(blocks, *peer_id)?; let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) / EPOCHS_PER_BATCH; @@ -918,7 +952,7 @@ impl SyncingChain { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer - if !batch.is_expecting_block(&request_id) { + if !batch.is_expecting_request_id(&request_id) { debug!( batch_epoch = %batch_id, batch_state = ?batch.state(), @@ -1233,7 +1267,7 @@ impl SyncingChain { // only request batches up to the buffer size limit // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync // if the current processing window is contained in a long range of skip slots. - let in_buffer = |batch: &BatchInfo| { + let in_buffer = |batch: &RangeSyncBatchInfo| { matches!( batch.state(), BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) @@ -1320,7 +1354,7 @@ impl SyncingChain { } } -use super::batch::WrongState as WrongBatchState; +use crate::sync::batch::WrongState as WrongBatchState; impl From for RemoveChain { fn from(err: WrongBatchState) -> Self { RemoveChain::WrongBatchState(err.0) diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 8f881fba90..dd9f17bfd1 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -1,17 +1,11 @@ //! This provides the logic for syncing a chain when the local node is far behind it's current //! peers. - -mod batch; mod chain; mod chain_collection; mod range; mod sync_type; -pub use batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, - ByRangeRequestType, -}; -pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; +pub use chain::{ChainId, EPOCHS_PER_BATCH}; #[cfg(test)] pub use chain_collection::SyncChainStatus; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 465edd3697..c9656ad1d0 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -39,12 +39,13 @@ //! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially //! and further batches are requested as current blocks are being processed. -use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; +use super::chain::{ChainId, RemoveChain, SyncingChain}; use super::chain_collection::{ChainCollection, SyncChainStatus}; use super::sync_type::RangeSyncType; use crate::metrics; use crate::status::ToStatusMessage; use crate::sync::BatchProcessResult; +use crate::sync::batch::BatchId; use crate::sync::network_context::{RpcResponseError, SyncNetworkContext}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index d58cf2e731..895afa4f33 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -949,6 +949,19 @@ impl, Cold: ItemStore> HotColdDB )); } + pub fn data_column_as_kv_store_ops( + &self, + block_root: &Hash256, + data_column: Arc>, + ops: &mut Vec, + ) { + ops.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconDataColumn, + get_data_column_key(block_root, &data_column.index), + data_column.as_ssz_bytes(), + )); + } + pub fn put_data_column_custody_info( &self, earliest_data_column_slot: Option, diff --git a/common/eth2/src/lighthouse/sync_state.rs b/common/eth2/src/lighthouse/sync_state.rs index 0327f7073f..9f6f3b52e0 100644 --- a/common/eth2/src/lighthouse/sync_state.rs +++ b/common/eth2/src/lighthouse/sync_state.rs @@ -15,6 +15,10 @@ pub enum SyncState { /// specified by its peers. Once completed, the node enters this sync state and attempts to /// download all required historical blocks. BackFillSyncing { completed: usize, remaining: usize }, + /// The node is undertaking a custody backfill sync. This occurs for a node that has completed forward and + /// backfill sync and has undergone a custody count change. During custody backfill sync the node attempts + /// to backfill its new column custody requirements up to the data availability window. + CustodyBackFillSyncing { completed: usize, remaining: usize }, /// The node has completed syncing a finalized chain and is in the process of re-evaluating /// which sync state to progress to. SyncTransition, @@ -39,6 +43,17 @@ pub enum BackFillState { Failed, } +#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +/// The state of the custody backfill sync. +pub enum CustodyBackFillState { + /// We are currently backfilling custody columns. + Syncing, + /// A custody backfill sync has completed. + Completed, + /// A custody sync should is set to Pending for various reasons. + Pending(String), +} + impl PartialEq for SyncState { fn eq(&self, other: &Self) -> bool { matches!( @@ -54,6 +69,10 @@ impl PartialEq for SyncState { SyncState::BackFillSyncing { .. }, SyncState::BackFillSyncing { .. } ) + | ( + SyncState::CustodyBackFillSyncing { .. }, + SyncState::CustodyBackFillSyncing { .. } + ) ) } } @@ -65,8 +84,8 @@ impl SyncState { SyncState::SyncingFinalized { .. } => true, SyncState::SyncingHead { .. } => true, SyncState::SyncTransition => true, - // Backfill doesn't effect any logic, we consider this state, not syncing. - SyncState::BackFillSyncing { .. } => false, + // Both backfill and custody backfill don't effect any logic, we consider this state, not syncing. + SyncState::BackFillSyncing { .. } | SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -77,7 +96,7 @@ impl SyncState { SyncState::SyncingFinalized { .. } => true, SyncState::SyncingHead { .. } => false, SyncState::SyncTransition => false, - SyncState::BackFillSyncing { .. } => false, + SyncState::BackFillSyncing { .. } | SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -87,7 +106,12 @@ impl SyncState { /// /// NOTE: We consider the node synced if it is fetching old historical blocks. pub fn is_synced(&self) -> bool { - matches!(self, SyncState::Synced | SyncState::BackFillSyncing { .. }) + matches!( + self, + SyncState::Synced + | SyncState::BackFillSyncing { .. } + | SyncState::CustodyBackFillSyncing { .. } + ) } /// Returns true if the node is *stalled*, i.e. has no synced peers. @@ -108,6 +132,9 @@ impl std::fmt::Display for SyncState { SyncState::Stalled => write!(f, "Stalled"), SyncState::SyncTransition => write!(f, "Evaluating known peers"), SyncState::BackFillSyncing { .. } => write!(f, "Syncing Historical Blocks"), + SyncState::CustodyBackFillSyncing { .. } => { + write!(f, "Syncing Historical Data Columns") + } } } } diff --git a/consensus/types/src/slot_epoch.rs b/consensus/types/src/slot_epoch.rs index 857044f981..05af9c5232 100644 --- a/consensus/types/src/slot_epoch.rs +++ b/consensus/types/src/slot_epoch.rs @@ -33,6 +33,13 @@ pub struct Slot(#[serde(with = "serde_utils::quoted_u64")] u64); #[serde(transparent)] pub struct Epoch(#[serde(with = "serde_utils::quoted_u64")] u64); +impl Epoch { + /// Returns an iterator `(end..=start)` + pub fn range_inclusive_rev(start: Self, end: Self) -> impl Iterator { + (end.0..=start.0).rev().map(Epoch) + } +} + impl_common!(Slot); impl_common!(Epoch); diff --git a/scripts/tests/checkpoint-sync.sh b/scripts/tests/checkpoint-sync.sh index df03da042e..605dc504f5 100755 --- a/scripts/tests/checkpoint-sync.sh +++ b/scripts/tests/checkpoint-sync.sh @@ -102,7 +102,8 @@ node_completed["fullnode"]=false echo "Polling sync status until backfill reaches ${TARGET_BACKFILL_SLOTS} slots or timeout of ${TIMEOUT_MINS} mins" -while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode]}" = false ]; do +# while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode]}" = false ]; do +while [ "${node_completed[fullnode]}" = false ]; do current_time=$(date +%s) elapsed=$((current_time - start_time)) @@ -112,7 +113,8 @@ while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode fi # Poll each node that hasn't completed yet - for node in "supernode" "fullnode"; do + # for node in "supernode" "fullnode"; do + for node in "fullnode"; do if [ "${node_completed[$node]}" = false ]; then poll_node "$node" fi @@ -121,7 +123,7 @@ while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode sleep $POLL_INTERVAL_SECS done -echo "Sync test complete! Both supernode and fullnode have synced to HEAD and backfilled ${TARGET_BACKFILL_SLOTS} slots." -echo "Supernode time: $((node_complete_time[supernode] - start_time)) seconds" +echo "Sync test complete! Fullnode has synced to HEAD and backfilled ${TARGET_BACKFILL_SLOTS} slots." +# echo "Supernode time: $((node_complete_time[supernode] - start_time)) seconds" echo "Fullnode time: $((node_complete_time[fullnode] - start_time)) seconds" exit_and_dump_logs 0 \ No newline at end of file