diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e299bea2da..152de1a20b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6991,6 +6991,95 @@ impl BeaconChain { } } + /// Safely update data column custody info by ensuring that: + /// - cgc values at the updated epoch and the earliest custodied column epoch are equal + /// - we are only decrementing the earliest custodied data column epoch by one epoch + /// - the new earliest data column slot is set to the first slot in `effective_epoch`. + pub fn safely_backfill_data_column_custody_info( + &self, + effective_epoch: Epoch, + ) -> Result<(), Error> { + let Some(earliest_data_column_epoch) = self.earliest_custodied_data_column_epoch() else { + return Ok(()); + }; + + if effective_epoch >= earliest_data_column_epoch { + return Ok(()); + } + + let cgc_at_effective_epoch = self + .data_availability_checker + .custody_context() + .custody_group_count_at_epoch(effective_epoch, &self.spec); + + let cgc_at_earliest_data_colum_epoch = self + .data_availability_checker + .custody_context() + .custody_group_count_at_epoch(earliest_data_column_epoch, &self.spec); + + let can_update_data_column_custody_info = cgc_at_effective_epoch + == cgc_at_earliest_data_colum_epoch + && effective_epoch == earliest_data_column_epoch - 1; + + if can_update_data_column_custody_info { + self.store.put_data_column_custody_info(Some( + effective_epoch.start_slot(T::EthSpec::slots_per_epoch()), + ))?; + } else { + error!( + ?cgc_at_effective_epoch, + ?cgc_at_earliest_data_colum_epoch, + ?effective_epoch, + ?earliest_data_column_epoch, + "Couldn't update data column custody info" + ); + return Err(Error::FailedColumnCustodyInfoUpdate); + } + + Ok(()) + } + + /// Compare columns custodied for `epoch` versus columns custodied for the head of the chain + /// and return any column indices that are missing. + pub fn get_missing_columns_for_epoch(&self, epoch: Epoch) -> HashSet { + let custody_context = self.data_availability_checker.custody_context(); + + let columns_required = custody_context + .custody_columns_for_epoch(None, &self.spec) + .iter() + .cloned() + .collect::>(); + + let current_columns_at_epoch = custody_context + .custody_columns_for_epoch(Some(epoch), &self.spec) + .iter() + .cloned() + .collect::>(); + + columns_required + .difference(¤t_columns_at_epoch) + .cloned() + .collect::>() + } + + /// The da boundary for custodying columns. It will just be the DA boundary unless we are near the Fulu fork epoch. + pub fn get_column_da_boundary(&self) -> Option { + match self.data_availability_boundary() { + Some(da_boundary_epoch) => { + if let Some(fulu_fork_epoch) = self.spec.fulu_fork_epoch { + if da_boundary_epoch < fulu_fork_epoch { + Some(fulu_fork_epoch) + } else { + Some(da_boundary_epoch) + } + } else { + None + } + } + None => None, // If no DA boundary set, dont try to custody backfill + } + } + /// This method serves to get a sense of the current chain health. It is used in block proposal /// to determine whether we should outsource payload production duties. /// diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 7b04a36fae..d4eba2b0ea 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -247,6 +247,7 @@ pub enum BeaconChainError { cache_epoch: Epoch, }, SkipProposerPreparation, + FailedColumnCustodyInfoUpdate, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/historical_data_columns.rs b/beacon_node/beacon_chain/src/historical_data_columns.rs new file mode 100644 index 0000000000..7e196eb75e --- /dev/null +++ b/beacon_node/beacon_chain/src/historical_data_columns.rs @@ -0,0 +1,151 @@ +use std::collections::{HashMap, HashSet}; + +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, + data_column_verification::verify_kzg_for_data_column_list, +}; +use store::{Error as StoreError, KeyValueStore}; +use tracing::{Span, debug, instrument}; +use types::{ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, Hash256, Slot}; + +#[derive(Debug)] +pub enum HistoricalDataColumnError { + // The provided data column sidecar pertains to a block that doesn't exist in the database. + NoBlockFound { + data_column_block_root: Hash256, + expected_block_root: Hash256, + }, + + /// Logic error: should never occur. + IndexOutOfBounds, + + /// The provided data column sidecar list doesn't contain columns for the full range of slots for the given epoch. + MissingDataColumns { + missing_slots_and_data_columns: Vec<(Slot, ColumnIndex)>, + }, + + /// The provided data column sidecar list contains at least one column with an invalid kzg commitment. + InvalidKzg, + + /// Internal store error + StoreError(StoreError), + + /// Internal beacon chain error + BeaconChainError(Box), +} + +impl From for HistoricalDataColumnError { + fn from(e: StoreError) -> Self { + Self::StoreError(e) + } +} + +impl BeaconChain { + /// Store a batch of historical data columns in the database. + /// + /// The data columns block roots and proposer signatures are verified with the existing + /// block stored in the DB. This function also verifies the columns KZG committments. + /// + /// This function requires that the data column sidecar list contains columns for a full epoch. + /// + /// Return the number of `data_columns` successfully imported. + #[instrument(skip_all, fields(columns_imported_count = tracing::field::Empty ))] + pub fn import_historical_data_column_batch( + &self, + epoch: Epoch, + historical_data_column_sidecar_list: DataColumnSidecarList, + ) -> Result { + let mut total_imported = 0; + let mut ops = vec![]; + + let unique_column_indices = historical_data_column_sidecar_list + .iter() + .map(|item| item.index) + .collect::>(); + + let mut slot_and_column_index_to_data_columns = historical_data_column_sidecar_list + .iter() + .map(|data_column| ((data_column.slot(), data_column.index), data_column)) + .collect::>(); + + let forward_blocks_iter = self + .forwards_iter_block_roots_until( + epoch.start_slot(T::EthSpec::slots_per_epoch()), + epoch.end_slot(T::EthSpec::slots_per_epoch()), + ) + .map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?; + + for block_iter_result in forward_blocks_iter { + let (block_root, slot) = block_iter_result + .map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?; + + for column_index in unique_column_indices.clone() { + if let Some(data_column) = + slot_and_column_index_to_data_columns.remove(&(slot, column_index)) + { + if self + .store + .get_data_column(&block_root, &data_column.index)? + .is_some() + { + debug!( + block_root = ?block_root, + column_index = data_column.index, + "Skipping data column import as identical data column exists" + ); + continue; + } + if block_root != data_column.block_root() { + return Err(HistoricalDataColumnError::NoBlockFound { + data_column_block_root: data_column.block_root(), + expected_block_root: block_root, + }); + } + self.store.data_column_as_kv_store_ops( + &block_root, + data_column.clone(), + &mut ops, + ); + total_imported += 1; + } + } + } + + // If we've made it to here with no columns to import, this means there are no blobs for this epoch. + // `RangeDataColumnBatchRequest` logic should have caught any bad peers withholding columns + if historical_data_column_sidecar_list.is_empty() { + if !ops.is_empty() { + // This shouldn't be a valid case. If there are no columns to import, + // there should be no generated db operations. + return Err(HistoricalDataColumnError::IndexOutOfBounds); + } + } else { + verify_kzg_for_data_column_list(historical_data_column_sidecar_list.iter(), &self.kzg) + .map_err(|_| HistoricalDataColumnError::InvalidKzg)?; + + self.store.blobs_db.do_atomically(ops)?; + } + + if !slot_and_column_index_to_data_columns.is_empty() { + debug!( + ?epoch, + extra_data = ?slot_and_column_index_to_data_columns.keys().map(|(slot, _)| slot), + "We've received unexpected extra data columns, these will not be imported" + ); + } + + self.data_availability_checker + .custody_context() + .update_and_backfill_custody_count_at_epoch(epoch); + + self.safely_backfill_data_column_custody_info(epoch) + .map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?; + + debug!(?epoch, total_imported, "Imported historical data columns"); + + let current_span = Span::current(); + current_span.record("columns_imported_count", total_imported); + + Ok(total_imported) + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 9d8c3dba38..fd2162e7d3 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -28,6 +28,7 @@ pub mod fork_choice_signal; pub mod fork_revert; pub mod graffiti_calculator; pub mod historical_blocks; +pub mod historical_data_columns; pub mod kzg_utils; pub mod light_client_finality_update_verification; pub mod light_client_optimistic_update_verification; diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 3ab76828c9..ea1dfdaae0 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -10,7 +10,7 @@ use types::data_column_custody_group::{CustodyIndex, compute_columns_for_custody use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Slot}; /// A delay before making the CGC change effective to the data availability checker. -const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; +pub const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; /// Number of slots after which a validator's registration is removed if it has not re-registered. const VALIDATOR_REGISTRATION_EXPIRY_SLOTS: Slot = Slot::new(256); @@ -30,8 +30,10 @@ struct ValidatorRegistrations { /// /// Note: Only stores the epoch value when there's a change in custody requirement. /// So if epoch 10 and 11 has the same custody requirement, only 10 is stored. - /// This map is never pruned, because currently we never decrease custody requirement, so this - /// map size is contained at 128. + /// This map is only pruned during custody backfill. If epoch 11 has custody requirements + /// that are then backfilled to epoch 10, the value at epoch 11 will be removed and epoch 10 + /// will be added to the map instead. This should keep map size constrained to a maximum + /// value of 128. epoch_validator_custody_requirements: BTreeMap, } @@ -99,6 +101,25 @@ impl ValidatorRegistrations { None } } + + /// Updates the `epoch_validator_custody_requirements` map by pruning all values on/after `effective_epoch` + /// and updating the map to store the latest validator custody requirements for the `effective_epoch`. + pub fn backfill_validator_custody_requirements(&mut self, effective_epoch: Epoch) { + if let Some(latest_validator_custody) = self.latest_validator_custody_requirement() { + // Delete records if + // 1. The epoch is greater than or equal than `effective_epoch` + // 2. the cgc requirements match the latest validator custody requirements + self.epoch_validator_custody_requirements + .retain(|&epoch, custody_requirement| { + !(epoch >= effective_epoch && *custody_requirement == latest_validator_custody) + }); + + self.epoch_validator_custody_requirements + .entry(effective_epoch) + .and_modify(|old_custody| *old_custody = latest_validator_custody) + .or_insert(latest_validator_custody); + } + } } /// Given the `validator_custody_units`, return the custody requirement based on @@ -250,6 +271,7 @@ impl CustodyContext { ); return Some(CustodyCountChanged { new_custody_group_count: updated_cgc, + old_custody_group_count: current_cgc, sampling_count: self.num_of_custody_groups_to_sample(effective_epoch, spec), effective_epoch, }); @@ -282,7 +304,7 @@ impl CustodyContext { /// minimum sampling size which may exceed the custody group count (CGC). /// /// See also: [`Self::num_of_custody_groups_to_sample`]. - fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 { + pub fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 { if self.current_is_supernode { spec.number_of_custody_groups } else { @@ -360,14 +382,22 @@ impl CustodyContext { .all_custody_columns_ordered .get() .expect("all_custody_columns_ordered should be initialized"); + &all_columns_ordered[..custody_group_count] } + + pub fn update_and_backfill_custody_count_at_epoch(&self, effective_epoch: Epoch) { + self.validator_registrations + .write() + .backfill_validator_custody_requirements(effective_epoch); + } } /// The custody count changed because of a change in the /// number of validators being managed. pub struct CustodyCountChanged { pub new_custody_group_count: u64, + pub old_custody_group_count: u64, pub sampling_count: u64, pub effective_epoch: Epoch, } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ec5c1c90db..69d16b3071 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -4,12 +4,14 @@ use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::builder::BeaconChainBuilder; use beacon_chain::data_availability_checker::AvailableBlock; +use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::schema_change::migrate_schema; use beacon_chain::test_utils::SyncCommitteeStrategy; use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, get_kzg, mock_execution_layer_from_parts, test_spec, }; +use beacon_chain::validator_custody::CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS; use beacon_chain::{ BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, BlockError, ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, @@ -3169,6 +3171,245 @@ async fn weak_subjectivity_sync_test( assert_eq!(store.get_anchor_info().state_upper_limit, Slot::new(0)); } +#[tokio::test] +async fn test_import_historical_data_columns_batch() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Epoch::new(0).start_slot(E::slots_per_epoch()) + 1; + let end_slot = Epoch::new(0).end_slot(E::slots_per_epoch()); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + for data_column in data_columns.unwrap() { + data_columns_list.push(data_column); + } + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + harness + .chain + .import_historical_data_column_batch(Epoch::new(0), data_columns_list) + .unwrap(); + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()) + } +} + +// This should verify that a data column sidecar containing mismatched block roots should fail to be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_mismatched_block_root() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + if data_column.index % 2 == 0 { + data_column.signed_block_header.message.body_root = Hash256::ZERO; + } + + data_columns_list.push(Arc::new(data_column)); + } + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch( + start_slot.epoch(E::slots_per_epoch()), + data_columns_list, + ) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::NoBlockFound { .. } + )); +} + +// This should verify that a data column sidecar associated to a block root that doesn't exist in the store cannot +// be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_no_block_found() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + data_column.signed_block_header.message.body_root = Hash256::ZERO; + data_columns_list.push(Arc::new(data_column)); + } + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(Epoch::new(0), data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::NoBlockFound { .. } + )); +} + /// Test that blocks and attestations that refer to states around an unaligned split state are /// processed correctly. #[tokio::test] @@ -4845,6 +5086,166 @@ async fn test_custody_column_filtering_supernode() { ); } +#[tokio::test] +async fn test_missing_columns_after_cgc_change() { + let spec = test_spec::(); + + let num_validators = 8; + + let num_epochs_before_increase = 4; + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone().into()) + .deterministic_keypairs(num_validators) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + let state = harness.chain.head_beacon_state_cloned(); + + if !state.fork_name_unchecked().fulu_enabled() { + return; + } + + let custody_context = harness.chain.data_availability_checker.custody_context(); + + harness.advance_slot(); + harness + .extend_chain( + (E::slots_per_epoch() * num_epochs_before_increase) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let epoch_before_increase = Epoch::new(num_epochs_before_increase); + + let missing_columns = harness + .chain + .get_missing_columns_for_epoch(epoch_before_increase); + + // We should have no missing columns + assert_eq!(missing_columns.len(), 0); + + let epoch_after_increase = Epoch::new(num_epochs_before_increase + 2); + + let cgc_change_slot = epoch_before_increase.end_slot(E::slots_per_epoch()); + custody_context.register_validators(vec![(1, 32_000_000_000 * 9)], cgc_change_slot, &spec); + + harness.advance_slot(); + harness + .extend_chain( + (E::slots_per_epoch() * 5) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // We should have missing columns from before the cgc increase + let missing_columns = harness + .chain + .get_missing_columns_for_epoch(epoch_before_increase); + + assert!(!missing_columns.is_empty()); + + // We should have no missing columns after the cgc increase + let missing_columns = harness + .chain + .get_missing_columns_for_epoch(epoch_after_increase); + + assert!(missing_columns.is_empty()); +} + +#[tokio::test] +async fn test_safely_backfill_data_column_custody_info() { + let spec = test_spec::(); + + let num_validators = 8; + + let start_epochs = 4; + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone().into()) + .deterministic_keypairs(num_validators) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + let state = harness.chain.head_beacon_state_cloned(); + + if !state.fork_name_unchecked().fulu_enabled() { + return; + } + + let custody_context = harness.chain.data_availability_checker.custody_context(); + + harness.advance_slot(); + harness + .extend_chain( + (E::slots_per_epoch() * start_epochs) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let epoch_before_increase = Epoch::new(start_epochs); + let effective_delay_slots = + CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS / harness.chain.spec.seconds_per_slot; + + let cgc_change_slot = epoch_before_increase.end_slot(E::slots_per_epoch()); + + custody_context.register_validators(vec![(1, 32_000_000_000 * 16)], cgc_change_slot, &spec); + + let epoch_after_increase = + (cgc_change_slot + effective_delay_slots).epoch(E::slots_per_epoch()); + + harness.advance_slot(); + harness + .extend_chain( + (E::slots_per_epoch() * 5) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let head_slot = harness.chain.head().snapshot.beacon_block.slot(); + + harness + .chain + .update_data_column_custody_info(Some(head_slot)); + + // We can only safely update custody column info 1 epoch at a time + // Skipping an epoch should return an error + harness + .chain + .safely_backfill_data_column_custody_info(head_slot.epoch(E::slots_per_epoch()) - 2) + .unwrap_err(); + + // Iterate from the head epoch back to 0 and try to backfill data column custody info + for epoch in (0..head_slot.epoch(E::slots_per_epoch()).into()).rev() { + // This is an epoch before the cgc change took into effect, we shouldnt be able to update + // without performing custody backfill sync + if epoch <= epoch_after_increase.into() { + harness + .chain + .safely_backfill_data_column_custody_info(Epoch::new(epoch)) + .unwrap_err(); + } else { + // This is an epoch after the cgc change took into effect, we should be able to update + // as long as we iterate epoch by epoch + harness + .chain + .safely_backfill_data_column_custody_info(Epoch::new(epoch)) + .unwrap(); + let earliest_available_epoch = harness + .chain + .earliest_custodied_data_column_epoch() + .unwrap(); + assert_eq!(Epoch::new(epoch), earliest_available_epoch); + } + } +} + /// Checks that two chains are the same, for the purpose of these tests. /// /// Several fields that are hard/impossible to check are ignored (e.g., the store). diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index c83cdad7e0..10d9587ccc 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -57,6 +57,9 @@ pub fn spawn_notifier( // Store info if we are required to do a backfill sync. let original_oldest_block_slot = beacon_chain.store.get_anchor_info().oldest_block_slot; + // Use this info during custody backfill sync. + let mut original_earliest_data_column_slot = None; + let interval_future = async move { // Perform pre-genesis logging. loop { @@ -80,6 +83,7 @@ pub fn spawn_notifier( // Perform post-genesis logging. let mut last_backfill_log_slot = None; + let mut last_custody_backfill_log_slot = None; loop { // Run the notifier half way through each slot. @@ -112,6 +116,18 @@ pub fn spawn_notifier( let mut speedo = speedo.lock().await; speedo.clear(); } + (_, SyncState::CustodyBackFillSyncing { .. }) => { + // We have transitioned to a custody backfill sync. Reset the speedo. + let mut speedo = speedo.lock().await; + last_custody_backfill_log_slot = None; + speedo.clear(); + } + (SyncState::CustodyBackFillSyncing { .. }, _) => { + // We have transitioned from a custody backfill sync, reset the speedo + let mut speedo = speedo.lock().await; + last_custody_backfill_log_slot = None; + speedo.clear(); + } (_, _) => {} } current_sync_state = sync_state; @@ -154,6 +170,38 @@ pub fn spawn_notifier( Instant::now(), ); } + SyncState::CustodyBackFillSyncing { .. } => { + match beacon_chain.store.get_data_column_custody_info() { + Ok(data_column_custody_info) => { + if let Some(earliest_data_column_slot) = data_column_custody_info + .and_then(|info| info.earliest_data_column_slot) + && let Some(da_boundary) = beacon_chain.get_column_da_boundary() + { + sync_distance = earliest_data_column_slot.saturating_sub( + da_boundary.start_slot(T::EthSpec::slots_per_epoch()), + ); + + // We keep track of our starting point for custody backfill sync + // so we can measure our speed of progress. + if original_earliest_data_column_slot.is_none() { + original_earliest_data_column_slot = + Some(earliest_data_column_slot) + } + + if let Some(original_earliest_data_column_slot) = + original_earliest_data_column_slot + { + speedo.observe( + original_earliest_data_column_slot + .saturating_sub(earliest_data_column_slot), + Instant::now(), + ); + } + } + } + Err(e) => error!(error=?e, "Unable to get data column custody info"), + } + } SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } | SyncState::SyncTransition => { @@ -190,6 +238,8 @@ pub fn spawn_notifier( // Log if we are backfilling. let is_backfilling = matches!(current_sync_state, SyncState::BackFillSyncing { .. }); + let is_custody_backfilling = + matches!(current_sync_state, SyncState::CustodyBackFillSyncing { .. }); if is_backfilling && last_backfill_log_slot .is_none_or(|slot| slot + BACKFILL_LOG_INTERVAL <= current_slot) @@ -234,6 +284,51 @@ pub fn spawn_notifier( info!("Historical block download complete"); } + if is_custody_backfilling + && last_custody_backfill_log_slot + .is_none_or(|slot| slot + BACKFILL_LOG_INTERVAL <= current_slot) + { + last_custody_backfill_log_slot = Some(current_slot); + + let distance = format!( + "{} slots ({})", + sync_distance.as_u64(), + slot_distance_pretty(sync_distance, slot_duration) + ); + + let speed = speedo.slots_per_second(); + let display_speed = speed.is_some_and(|speed| speed != 0.0); + + if display_speed { + info!( + distance, + speed = sync_speed_pretty(speed), + est_time = + estimated_time_pretty(beacon_chain.get_column_da_boundary().and_then( + |da_boundary| speedo.estimated_time_till_slot( + da_boundary.start_slot(T::EthSpec::slots_per_epoch()) + ) + )), + "Downloading historical data columns" + ); + } else { + info!( + distance, + est_time = + estimated_time_pretty(beacon_chain.get_column_da_boundary().and_then( + |da_boundary| speedo.estimated_time_till_slot( + da_boundary.start_slot(T::EthSpec::slots_per_epoch()) + ) + )), + "Downloading historical data columns" + ); + } + } else if !is_custody_backfilling && last_custody_backfill_log_slot.is_some() { + last_custody_backfill_log_slot = None; + original_earliest_data_column_slot = None; + info!("Historical data column download complete"); + } + // Log if we are syncing if current_sync_state.is_syncing() { metrics::set_gauge(&metrics::IS_SYNCED, 0); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 170012b04b..f6d8dbc157 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -478,7 +478,9 @@ pub fn serve( ))) } } - SyncState::SyncTransition | SyncState::BackFillSyncing { .. } => Ok(()), + SyncState::SyncTransition + | SyncState::BackFillSyncing { .. } + | SyncState::CustodyBackFillSyncing { .. } => Ok(()), SyncState::Synced => Ok(()), SyncState::Stalled => Ok(()), } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 0f5fd99c27..f1a4d87de7 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -60,8 +60,8 @@ pub struct BlobsByRangeRequestId { pub struct DataColumnsByRangeRequestId { /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` pub id: Id, - /// The Id of the overall By Range request for block components. - pub parent_request_id: ComponentsByRangeRequestId, + /// The Id of the overall By Range request for either a components by range request or a custody backfill request. + pub parent_request_id: DataColumnsByRangeRequester, /// The peer id associated with the request. /// /// This is useful to penalize the peer at a later point if it returned data columns that @@ -69,6 +69,12 @@ pub struct DataColumnsByRangeRequestId { pub peer: PeerId, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum DataColumnsByRangeRequester { + ComponentsByRange(ComponentsByRangeRequestId), + CustodyBackfillSync(CustodyBackFillBatchRequestId), +} + /// Block components by range request for range sync. Includes an ID for downstream consumers to /// handle retries and tie all their sub requests together. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -80,6 +86,24 @@ pub struct ComponentsByRangeRequestId { pub requester: RangeRequestId, } +/// A batch of data columns by range request for custody sync. Includes an ID for downstream consumers to +/// handle retries and tie all the range requests for the given epoch together. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodyBackFillBatchRequestId { + /// For each `epoch` we may request the same data in a later retry. This Id identifies the + /// current attempt. + pub id: Id, + pub batch_id: CustodyBackfillBatchId, +} + +/// Custody backfill may be restarted and sync each epoch multiple times in different runs. Identify +/// each batch by epoch and run_id for uniqueness. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodyBackfillBatchId { + pub epoch: Epoch, + pub run_id: u64, +} + /// Range sync chain or backfill batch #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RangeRequestId { @@ -217,6 +241,8 @@ impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); impl_display!(CustodyId, "{}", requester); +impl_display!(CustodyBackFillBatchRequestId, "{}/{}", id, batch_id); +impl_display!(CustodyBackfillBatchId, "{}/{}", epoch, run_id); impl Display for DataColumnsByRootRequester { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -241,6 +267,15 @@ impl Display for RangeRequestId { } } +impl Display for DataColumnsByRangeRequester { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::ComponentsByRange(id) => write!(f, "ByRange/{id}"), + Self::CustodyBackfillSync(id) => write!(f, "CustodyBackfill/{id}"), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -263,15 +298,17 @@ mod tests { fn display_id_data_columns_by_range() { let id = DataColumnsByRangeRequestId { id: 123, - parent_request_id: ComponentsByRangeRequestId { - id: 122, - requester: RangeRequestId::RangeSync { - chain_id: 54, - batch_id: Epoch::new(0), + parent_request_id: DataColumnsByRangeRequester::ComponentsByRange( + ComponentsByRangeRequestId { + id: 122, + requester: RangeRequestId::RangeSync { + chain_id: 54, + batch_id: Epoch::new(0), + }, }, - }, + ), peer: PeerId::random(), }; - assert_eq!(format!("{id}"), "123/122/RangeSync/0/54"); + assert_eq!(format!("{id}"), "123/ByRange/122/RangeSync/0/54"); } } diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index b8c34f8392..2a3571c3b7 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -4,6 +4,7 @@ use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV3}; use crate::types::{BackFillState, SyncState}; use crate::{Client, Enr, GossipTopic, Multiaddr, NetworkConfig, PeerId}; +use eth2::lighthouse::sync_state::CustodyBackFillState; use network_utils::enr_ext::EnrExt; use parking_lot::RwLock; use std::collections::HashSet; @@ -29,6 +30,8 @@ pub struct NetworkGlobals { pub sync_state: RwLock, /// The current state of the backfill sync. pub backfill_state: RwLock, + /// The current state of custody sync. + pub custody_sync_state: RwLock, /// The computed sampling subnets and columns is stored to avoid re-computing. pub sampling_subnets: RwLock>, /// Network-related configuration. Immutable after initialization. @@ -91,6 +94,9 @@ impl NetworkGlobals { gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::Paused), + custody_sync_state: RwLock::new(CustodyBackFillState::Pending( + "Custody backfill sync initialized".to_string(), + )), sampling_subnets: RwLock::new(sampling_subnets), config, spec, diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index 0bbbcebaf2..3f57406fc7 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -10,7 +10,7 @@ pub type EnrSyncCommitteeBitfield = BitVector<::SyncCommitteeSu pub type Enr = discv5::enr::Enr; -pub use eth2::lighthouse::sync_state::{BackFillState, SyncState}; +pub use eth2::lighthouse::sync_state::{BackFillState, CustodyBackFillState, SyncState}; pub use globals::NetworkGlobals; pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; diff --git a/beacon_node/lighthouse_tracing/src/lib.rs b/beacon_node/lighthouse_tracing/src/lib.rs index 18a9874252..56dccadaa9 100644 --- a/beacon_node/lighthouse_tracing/src/lib.rs +++ b/beacon_node/lighthouse_tracing/src/lib.rs @@ -26,7 +26,9 @@ pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block"; pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs"; pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns"; pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment"; +pub const SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST: &str = "custody_backfill_sync_batch_request"; pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill"; +pub const SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS: &str = "custody_backfill_sync_import_columns"; /// Fork choice root spans pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot"; @@ -73,4 +75,6 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[ SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP, SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE, SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE, + SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST, + SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS, ]; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index a2b5af8b08..cea06a28c8 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -212,6 +212,22 @@ pub static BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL: LazyLock, +> = LazyLock::new(|| { + try_create_int_counter( + "beacon_processor_custody_backfill_column_import_success_total", + "Total number of custody backfill sync columns successfully processed.", + ) +}); +pub static BEACON_PROCESSOR_CUSTODY_BACKFILL_BATCH_FAILED_TOTAL: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_processor_custody_backfill_batch_failed_total", + "Total number of custody backfill batches that failed to be processed.", + ) + }); // Chain segments. pub static BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL: LazyLock> = LazyLock::new(|| { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 85ccde1d59..7441e92871 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -16,6 +16,7 @@ use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, LightClientUpdatesByRangeRequest, }; +use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PubsubMessage, rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, @@ -492,6 +493,22 @@ impl NetworkBeaconProcessor { }) } + pub fn send_historic_data_columns( + self: &Arc, + batch_id: CustodyBackfillBatchId, + data_columns: DataColumnSidecarList, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = move || processor.process_historic_data_columns(batch_id, data_columns); + + let work = Work::ChainSegmentBackfill(Box::new(process_fn)); + + self.try_send(BeaconWorkEvent { + drop_during_sync: true, + work, + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 1d99540c29..41b12fa01b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,6 +1,7 @@ use crate::metrics::{self, register_process_result_metrics}; use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProcessor}; use crate::sync::BatchProcessResult; +use crate::sync::manager::CustodyBatchProcessResult; use crate::sync::{ ChainId, manager::{BlockProcessType, SyncMessage}, @@ -8,6 +9,7 @@ use crate::sync::{ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; +use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, validator_monitor::get_slot_delay_ms, @@ -18,15 +20,17 @@ use beacon_processor::{ }; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::PeerAction; +use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_tracing::{ - SPAN_PROCESS_CHAIN_SEGMENT, SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, SPAN_PROCESS_RPC_BLOBS, - SPAN_PROCESS_RPC_BLOCK, SPAN_PROCESS_RPC_CUSTODY_COLUMNS, + SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS, SPAN_PROCESS_CHAIN_SEGMENT, + SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_BLOCK, + SPAN_PROCESS_RPC_CUSTODY_COLUMNS, }; use logging::crit; use std::sync::Arc; use std::time::Duration; use store::KzgCommitment; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{debug, debug_span, error, info, instrument, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; @@ -418,6 +422,103 @@ impl NetworkBeaconProcessor { }); } + pub fn process_historic_data_columns( + &self, + batch_id: CustodyBackfillBatchId, + downloaded_columns: DataColumnSidecarList, + ) { + let _guard = debug_span!( + SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS, + epoch = %batch_id.epoch, + columns_received_count = downloaded_columns.len() + ) + .entered(); + + let sent_columns = downloaded_columns.len(); + let result = match self + .chain + .import_historical_data_column_batch(batch_id.epoch, downloaded_columns) + { + Ok(imported_columns) => { + metrics::inc_counter_by( + &metrics::BEACON_PROCESSOR_CUSTODY_BACKFILL_COLUMN_IMPORT_SUCCESS_TOTAL, + imported_columns as u64, + ); + CustodyBatchProcessResult::Success { + sent_columns, + imported_columns, + } + } + Err(e) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_CUSTODY_BACKFILL_BATCH_FAILED_TOTAL, + ); + let peer_action: Option = match &e { + HistoricalDataColumnError::NoBlockFound { + data_column_block_root, + expected_block_root, + } => { + debug!( + error = "no_block_found", + ?data_column_block_root, + ?expected_block_root, + "Custody backfill batch processing error" + ); + // The peer is faulty if they send blocks with bad roots. + Some(PeerAction::LowToleranceError) + } + HistoricalDataColumnError::MissingDataColumns { .. } => { + warn!( + error = ?e, + "Custody backfill batch processing error", + ); + // The peer is faulty if they don't return data columns + // that they advertised as available. + Some(PeerAction::LowToleranceError) + } + HistoricalDataColumnError::InvalidKzg => { + warn!( + error = ?e, + "Custody backfill batch processing error", + ); + // The peer is faulty if they don't return data columns + // with valid kzg commitments. + Some(PeerAction::LowToleranceError) + } + HistoricalDataColumnError::BeaconChainError(e) => { + match &**e { + beacon_chain::BeaconChainError::FailedColumnCustodyInfoUpdate => {} + _ => { + warn!( + error = ?e, + "Custody backfill batch processing error", + ); + } + } + + // This is an interal error, don't penalize the peer + None + } + HistoricalDataColumnError::IndexOutOfBounds => { + error!( + error = ?e, + "Custody backfill batch out of bounds error" + ); + // This should never occur, don't penalize the peer. + None + } + HistoricalDataColumnError::StoreError(e) => { + warn!(error = ?e, "Custody backfill batch processing error"); + // This is an internal error, don't penalize the peer. + None + } + }; + CustodyBatchProcessResult::Error { peer_action } + } + }; + self.send_sync_message(SyncMessage::CustodyBatchProcessed { result, batch_id }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. #[instrument( diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 00597586b8..6c0cbd7e55 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -9,24 +9,27 @@ //! sync as failed, log an error and attempt to retry once a new peer joins the node. use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::batch::{ + BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, +}; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::manager::BatchProcessResult; use crate::sync::network_context::{ RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext, }; -use crate::sync::range_sync::{ - BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, -}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::service::api_types::Id; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; +use std::collections::hash_map::DefaultHasher; use std::collections::{ HashSet, btree_map::{BTreeMap, Entry}, }; +use std::hash::{Hash, Hasher}; +use std::marker::PhantomData; use std::sync::Arc; use tracing::{debug, error, info, warn}; use types::{ColumnIndex, Epoch, EthSpec}; @@ -49,21 +52,27 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 10; /// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 10; -/// Custom configuration for the batch object. -struct BackFillBatchConfig {} +type RpcBlocks = Vec>; -impl BatchConfig for BackFillBatchConfig { +type BackFillBatchInfo = BatchInfo, RpcBlocks>; + +type BackFillSyncBatches = BTreeMap>; + +/// Custom configuration for the batch object. +struct BackFillBatchConfig { + marker: PhantomData, +} + +impl BatchConfig for BackFillBatchConfig { fn max_batch_download_attempts() -> u8 { MAX_BATCH_DOWNLOAD_ATTEMPTS } fn max_batch_processing_attempts() -> u8 { MAX_BATCH_PROCESSING_ATTEMPTS } - fn batch_attempt_hash(blocks: &[RpcBlock]) -> u64 { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; + fn batch_attempt_hash(data: &D) -> u64 { let mut hasher = DefaultHasher::new(); - blocks.hash(&mut hasher); + data.hash(&mut hasher); hasher.finish() } } @@ -121,7 +130,7 @@ pub struct BackFillSync { last_batch_downloaded: bool, /// Sorted map of batches undergoing some kind of processing. - batches: BTreeMap>, + batches: BackFillSyncBatches, /// The current processing batch, if any. current_processing_batch: Option, @@ -349,7 +358,7 @@ impl BackFillSync { // reasons. Check that this block belongs to the expected peer // TODO(das): removed peer_id matching as the node may request a different peer for data // columns. - if !batch.is_expecting_block(&request_id) { + if !batch.is_expecting_request_id(&request_id) { return Ok(()); } debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed"); @@ -393,12 +402,13 @@ impl BackFillSync { // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer, and that the // request_id matches - if !batch.is_expecting_block(&request_id) { + if !batch.is_expecting_request_id(&request_id) { return Ok(ProcessResult::Successful); } + let received = blocks.len(); match batch.download_completed(blocks, *peer_id) { - Ok(received) => { + Ok(_) => { let awaiting_batches = self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; debug!( @@ -1050,7 +1060,7 @@ impl BackFillSync { // only request batches up to the buffer size limit // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync // if the current processing window is contained in a long range of skip slots. - let in_buffer = |batch: &BatchInfo| { + let in_buffer = |batch: &BackFillBatchInfo| { matches!( batch.state(), BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/batch.rs similarity index 82% rename from beacon_node/network/src/sync/range_sync/batch.rs rename to beacon_node/network/src/sync/batch.rs index c79800bfbe..ea0ef15f4b 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -2,29 +2,28 @@ use beacon_chain::block_verification_types::RpcBlock; use derivative::Derivative; use lighthouse_network::PeerId; use lighthouse_network::rpc::methods::BlocksByRangeRequest; +use lighthouse_network::rpc::methods::DataColumnsByRangeRequest; use lighthouse_network::service::api_types::Id; use std::collections::HashSet; -use std::fmt; -use std::hash::{Hash, Hasher}; +use std::hash::Hash; +use std::marker::PhantomData; use std::ops::Sub; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::time::Instant; use strum::Display; -use types::{Epoch, EthSpec, Slot}; +use types::Slot; +use types::{DataColumnSidecarList, Epoch, EthSpec}; -/// The number of times to retry a batch before it is considered failed. -const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; - -/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed -/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. -const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; +pub type BatchId = Epoch; /// Type of expected batch. -#[derive(Debug, Copy, Clone, Display)] +#[derive(Debug, Clone, Display)] #[strum(serialize_all = "snake_case")] pub enum ByRangeRequestType { BlocksAndColumns, BlocksAndBlobs, Blocks, + Columns(HashSet), } /// Allows customisation of the above constants used in other sync methods such as BackFillSync. @@ -60,28 +59,10 @@ pub trait BatchConfig { /// Note that simpler hashing functions considered in the past (hash of first block, hash of last /// block, number of received blocks) are not good enough to differentiate attempts. For this /// reason, we hash the complete set of blocks both in RangeSync and BackFillSync. - fn batch_attempt_hash(blocks: &[RpcBlock]) -> u64; + fn batch_attempt_hash(data: &D) -> u64; } #[derive(Debug)] -pub struct RangeSyncBatchConfig {} - -impl BatchConfig for RangeSyncBatchConfig { - fn max_batch_download_attempts() -> u8 { - MAX_BATCH_DOWNLOAD_ATTEMPTS - } - fn max_batch_processing_attempts() -> u8 { - MAX_BATCH_PROCESSING_ATTEMPTS - } - fn batch_attempt_hash(blocks: &[RpcBlock]) -> u64 { - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - blocks.hash(&mut hasher); - hasher.finish() - } -} - -/// Error type of a batch in a wrong state. -// Such errors should never be encountered. pub struct WrongState(pub(crate) String); /// After batch operations, we use this to communicate whether a batch can continue or not @@ -100,28 +81,30 @@ pub enum BatchProcessingResult { #[derive(Derivative)] #[derivative(Debug)] /// A segment of a chain. -pub struct BatchInfo { +pub struct BatchInfo { /// Start slot of the batch. start_slot: Slot, /// End slot of the batch. end_slot: Slot, /// The `Attempts` that have been made and failed to send us this batch. - failed_processing_attempts: Vec, + failed_processing_attempts: Vec>, /// Number of processing attempts that have failed but we do not count. non_faulty_processing_attempts: u8, /// The number of download retries this batch has undergone due to a failed request. failed_download_attempts: Vec>, /// State of the batch. - state: BatchState, + state: BatchState, /// Whether this batch contains all blocks or all blocks and blobs. batch_type: ByRangeRequestType, /// Pin the generic #[derivative(Debug = "ignore")] - marker: std::marker::PhantomData, + marker: std::marker::PhantomData<(E, B)>, } -impl fmt::Display for BatchInfo { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl std::fmt::Display + for BatchInfo +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "Start Slot: {}, End Slot: {}, State: {}", @@ -132,21 +115,21 @@ impl fmt::Display for BatchInfo { #[derive(Display)] /// Current state of a batch -pub enum BatchState { +pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. Downloading(Id), /// The batch has been completely downloaded and is ready for processing. - AwaitingProcessing(PeerId, Vec>, Instant), + AwaitingProcessing(PeerId, D, Instant), /// The batch is being processed. - Processing(Attempt), + Processing(Attempt), /// The batch was successfully processed and is waiting to be validated. /// /// It is not sufficient to process a batch successfully to consider it correct. This is /// because batches could be erroneously empty, or incomplete. Therefore, a batch is considered /// valid, only if the next sequential batch imports at least a block. - AwaitingValidation(Attempt), + AwaitingValidation(Attempt), /// Intermediate state for inner state handling. Poisoned, /// The batch has maxed out the allowed attempts for either downloading or processing. It @@ -154,14 +137,14 @@ pub enum BatchState { Failed, } -impl BatchState { +impl BatchState { /// Helper function for poisoning a state. - pub fn poison(&mut self) -> BatchState { + pub fn poison(&mut self) -> BatchState { std::mem::replace(self, BatchState::Poisoned) } } -impl BatchInfo { +impl BatchInfo { /// Batches are downloaded excluding the first block of the epoch assuming it has already been /// downloaded. /// @@ -178,13 +161,13 @@ impl BatchInfo { pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ByRangeRequestType) -> Self { let start_slot = start_epoch.start_slot(E::slots_per_epoch()); let end_slot = start_slot + num_of_epochs * E::slots_per_epoch(); - BatchInfo { + Self { start_slot, end_slot, failed_processing_attempts: Vec::new(), failed_download_attempts: Vec::new(), non_faulty_processing_attempts: 0, - state: BatchState::AwaitingDownload, + state: BatchState::::AwaitingDownload, batch_type, marker: std::marker::PhantomData, } @@ -208,8 +191,8 @@ impl BatchInfo { peers } - /// Verifies if an incoming block belongs to this batch. - pub fn is_expecting_block(&self, request_id: &Id) -> bool { + /// Verifies if an incoming request id to this batch. + pub fn is_expecting_request_id(&self, request_id: &Id) -> bool { if let BatchState::Downloading(expected_id) = &self.state { return expected_id == request_id; } @@ -227,30 +210,6 @@ impl BatchInfo { } } - /// Returns the count of stored pending blocks if in awaiting processing state - pub fn pending_blocks(&self) -> usize { - match &self.state { - BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(), - BatchState::AwaitingDownload - | BatchState::Downloading { .. } - | BatchState::Processing { .. } - | BatchState::AwaitingValidation { .. } - | BatchState::Poisoned - | BatchState::Failed => 0, - } - } - - /// Returns a BlocksByRange request associated with the batch. - pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) { - ( - BlocksByRangeRequest::new( - self.start_slot.into(), - self.end_slot.sub(self.start_slot).into(), - ), - self.batch_type, - ) - } - /// After different operations over a batch, this could be in a state that allows it to /// continue, or in failed state. When the batch has failed, we check if it did mainly due to /// processing failures. In this case the batch is considered failed and faulty. @@ -265,27 +224,22 @@ impl BatchInfo { } } - pub fn state(&self) -> &BatchState { + pub fn state(&self) -> &BatchState { &self.state } - pub fn attempts(&self) -> &[Attempt] { + pub fn attempts(&self) -> &[Attempt] { &self.failed_processing_attempts } - /// Marks the batch as ready to be processed if the blocks are in the range. The number of - /// received blocks is returned, or the wrong batch end on failure + /// Marks the batch as ready to be processed if the data columns are in the range. The number of + /// received columns is returned, or the wrong batch end on failure #[must_use = "Batch may have failed"] - pub fn download_completed( - &mut self, - blocks: Vec>, - peer: PeerId, - ) -> Result { + pub fn download_completed(&mut self, data_columns: D, peer: PeerId) -> Result<(), WrongState> { match self.state.poison() { BatchState::Downloading(_) => { - let received = blocks.len(); - self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); - Ok(received) + self.state = BatchState::AwaitingProcessing(peer, data_columns, Instant::now()); + Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -376,17 +330,17 @@ impl BatchInfo { } } - pub fn start_processing(&mut self) -> Result<(Vec>, Duration), WrongState> { + pub fn start_processing(&mut self) -> Result<(D, Duration), WrongState> { match self.state.poison() { - BatchState::AwaitingProcessing(peer, blocks, start_instant) => { - self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); - Ok((blocks, start_instant.elapsed())) + BatchState::AwaitingProcessing(peer, data_columns, start_instant) => { + self.state = BatchState::Processing(Attempt::new::(peer, &data_columns)); + Ok((data_columns, start_instant.elapsed())) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { self.state = other; Err(WrongState(format!( - "Starting procesing batch in wrong state {:?}", + "Starting processing batch in wrong state {:?}", self.state ))) } @@ -466,37 +420,86 @@ impl BatchInfo { } } -/// Represents a peer's attempt and providing the result for this batch. -/// -/// Invalid attempts will downscore a peer. -#[derive(PartialEq, Debug)] -pub struct Attempt { +// BatchInfo implementations for RangeSync +impl BatchInfo>> { + /// Returns a BlocksByRange request associated with the batch. + pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) { + ( + BlocksByRangeRequest::new( + self.start_slot.into(), + self.end_slot.sub(self.start_slot).into(), + ), + self.batch_type.clone(), + ) + } + + /// Returns the count of stored pending blocks if in awaiting processing state + pub fn pending_blocks(&self) -> usize { + match &self.state { + BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(), + BatchState::AwaitingDownload + | BatchState::Downloading { .. } + | BatchState::Processing { .. } + | BatchState::AwaitingValidation { .. } + | BatchState::Poisoned + | BatchState::Failed => 0, + } + } +} + +// BatchInfo implementation for CustodyBackFillSync +impl BatchInfo> { + /// Returns a DataColumnsByRange request associated with the batch. + pub fn to_data_columns_by_range_request( + &self, + ) -> Result { + match &self.batch_type { + ByRangeRequestType::Columns(columns) => Ok(DataColumnsByRangeRequest { + start_slot: self.start_slot.into(), + count: self.end_slot.sub(self.start_slot).into(), + columns: columns.clone().into_iter().collect(), + }), + _ => Err(WrongState( + "Custody backfill sync can only make data columns by range requests.".to_string(), + )), + } + } +} + +#[derive(Debug)] +pub struct Attempt { /// The peer that made the attempt. pub peer_id: PeerId, /// The hash of the blocks of the attempt. pub hash: u64, + /// Pin the generic. + marker: PhantomData, } -impl Attempt { - fn new(peer_id: PeerId, blocks: &[RpcBlock]) -> Self { - let hash = B::batch_attempt_hash(blocks); - Attempt { peer_id, hash } +impl Attempt { + fn new(peer_id: PeerId, data: &D) -> Self { + let hash = B::batch_attempt_hash(data); + Attempt { + peer_id, + hash, + marker: PhantomData, + } } } -impl std::fmt::Debug for BatchState { +impl std::fmt::Debug for BatchState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - BatchState::Processing(Attempt { peer_id, hash: _ }) => { + BatchState::Processing(Attempt { peer_id, .. }) => { write!(f, "Processing({})", peer_id) } - BatchState::AwaitingValidation(Attempt { peer_id, hash: _ }) => { + BatchState::AwaitingValidation(Attempt { peer_id, .. }) => { write!(f, "AwaitingValidation({})", peer_id) } BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), BatchState::Failed => f.write_str("Failed"), - BatchState::AwaitingProcessing(peer, blocks, _) => { - write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) + BatchState::AwaitingProcessing(peer, ..) => { + write!(f, "AwaitingProcessing({})", peer) } BatchState::Downloading(request_id) => { write!(f, "Downloading({})", request_id) @@ -506,7 +509,7 @@ impl std::fmt::Debug for BatchState { } } -impl BatchState { +impl BatchState { /// Creates a character representation/visualization for the batch state to display in logs for quicker and /// easier recognition fn visualize(&self) -> char { diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index ba89d11225..cd9276f7e3 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -36,7 +36,7 @@ pub struct RangeBlockComponentsRequest { pub(crate) request_span: Span, } -enum ByRangeRequest { +pub enum ByRangeRequest { Active(I), Complete(T), } @@ -435,7 +435,7 @@ impl RangeBlockComponentsRequest { } impl ByRangeRequest { - fn finish(&mut self, id: I, data: T) -> Result<(), String> { + pub fn finish(&mut self, id: I, data: T) -> Result<(), String> { match self { Self::Active(expected_id) => { if expected_id != &id { @@ -448,7 +448,7 @@ impl ByRangeRequest { } } - fn to_finished(&self) -> Option<&T> { + pub fn to_finished(&self) -> Option<&T> { match self { Self::Active(_) => None, Self::Complete(data) => Some(data), @@ -467,7 +467,7 @@ mod tests { PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, Id, RangeRequestId, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id, RangeRequestId, }, }; use rand::SeedableRng; @@ -501,7 +501,7 @@ mod tests { fn columns_id( id: Id, - parent_request_id: ComponentsByRangeRequestId, + parent_request_id: DataColumnsByRangeRequester, ) -> DataColumnsByRangeRequestId { DataColumnsByRangeRequestId { id, @@ -598,7 +598,15 @@ mod tests { let columns_req_id = expects_custody_columns .iter() .enumerate() - .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, @@ -657,7 +665,15 @@ mod tests { let columns_req_id = batched_column_requests .iter() .enumerate() - .map(|(i, columns)| (columns_id(i as Id, components_id), columns.clone())) + .map(|(i, columns)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + columns.clone(), + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( @@ -738,7 +754,15 @@ mod tests { let columns_req_id = expected_custody_columns .iter() .enumerate() - .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, @@ -816,7 +840,15 @@ mod tests { let columns_req_id = expected_custody_columns .iter() .enumerate() - .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, @@ -852,7 +884,10 @@ mod tests { assert!(result.is_err()); // AND: We retry with a new peer for the failed column - let new_columns_req_id = columns_id(10 as Id, components_id); + let new_columns_req_id = columns_id( + 10 as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ); let failed_column_requests = vec![(new_columns_req_id, vec![2])]; info.reinsert_failed_column_requests(failed_column_requests) .unwrap(); @@ -898,7 +933,15 @@ mod tests { let columns_req_id = expected_custody_columns .iter() .enumerate() - .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, diff --git a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs new file mode 100644 index 0000000000..69df3422e6 --- /dev/null +++ b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs @@ -0,0 +1,1126 @@ +use std::{ + collections::{BTreeMap, HashSet, btree_map::Entry}, + marker::PhantomData, + sync::Arc, +}; + +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::{ + NetworkGlobals, PeerAction, PeerId, + service::api_types::{CustodyBackFillBatchRequestId, CustodyBackfillBatchId}, + types::CustodyBackFillState, +}; +use lighthouse_tracing::SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST; +use logging::crit; +use std::hash::{DefaultHasher, Hash, Hasher}; +use tracing::{debug, error, info, info_span, warn}; +use types::{DataColumnSidecarList, Epoch, EthSpec}; + +use crate::sync::{ + backfill_sync::{BACKFILL_EPOCHS_PER_BATCH, ProcessResult, SyncStart}, + batch::{ + BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + ByRangeRequestType, + }, + block_sidecar_coupling::CouplingError, + manager::CustodyBatchProcessResult, + network_context::{RpcResponseError, SyncNetworkContext}, +}; + +/// The maximum number of batches to queue before requesting more. +const BACKFILL_BATCH_BUFFER_SIZE: u8 = 5; + +/// Columns are downloaded in batches from peers. This constant specifies how many epochs worth of +/// columns per batch are requested _at most_. A batch may request less columns to account for +/// already requested columns. There is a timeout for each batch request. If this value is too high, +/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which +/// case the responder will fill the response up to the max request size, assuming they have the +/// bandwidth to do so. +pub const CUSTODY_BACKFILL_EPOCHS_PER_BATCH: u64 = 1; + +type CustodyBackFillBatchInfo = + BatchInfo, DataColumnSidecarList>; +type CustodyBackFillBatches = BTreeMap>; + +#[derive(Debug)] +pub struct CustodyBackFillBatchConfig { + marker: PhantomData, +} + +impl BatchConfig for CustodyBackFillBatchConfig { + fn max_batch_download_attempts() -> u8 { + 5 + } + fn max_batch_processing_attempts() -> u8 { + 5 + } + fn batch_attempt_hash(data: &D) -> u64 { + let mut hasher = DefaultHasher::new(); + data.hash(&mut hasher); + hasher.finish() + } +} + +/// The ways a custody backfill sync can fail. +// The info in the enum variants is displayed in logging, clippy thinks it's dead code. +#[derive(Debug)] +pub enum CustodyBackfillError { + /// A batch failed to be downloaded. + BatchDownloadFailed(#[allow(dead_code)] BatchId), + /// A batch could not be processed. + BatchProcessingFailed(#[allow(dead_code)] BatchId), + /// A batch entered an invalid state. + BatchInvalidState(#[allow(dead_code)] BatchId, #[allow(dead_code)] String), + /// The sync algorithm entered an invalid state. + InvalidSyncState(#[allow(dead_code)] String), + /// The chain became paused. + Paused, +} + +pub struct CustodyBackFillSync { + /// Keeps track of the current progress of the custody backfill. + /// This only gets refreshed from the beacon chain if we enter a failed state. + current_start: BatchId, + + /// Starting epoch of the batch that needs to be processed next. + /// This is incremented as the chain advances. + processing_target: BatchId, + + /// The custody group count we are trying to fulfill up to the DA window. + /// This is used as an indicator to restart custody backfill sync if the cgc + /// was changed in the middle of a currently active sync. + cgc: u64, + + /// Run ID of this backfill process. Increments if sync restarts. Used to differentiate batch + /// results from different runs. + run_id: u64, + + /// Starting epoch of the next batch that needs to be downloaded. + to_be_downloaded: BatchId, + + /// Keeps track if we have requested the final batch. + last_batch_downloaded: bool, + + /// Sorted map of batches undergoing some kind of processing. + batches: CustodyBackFillBatches, + + /// The current processing batch, if any. + current_processing_batch: Option, + + /// Batches validated. + validated_batches: u64, + + /// These are batches that we've skipped because we have no columns to fetch for the epoch. + skipped_batches: HashSet, + + /// When a custody backfill sync fails, we keep track of whether a new fully synced peer has joined. + /// This signifies that we are able to attempt to restart a failed chain. + restart_failed_sync: bool, + + /// Reference to the beacon chain to obtain initial starting points for custody backfill sync. + beacon_chain: Arc>, + + /// Reference to the network globals in order to obtain valid peers to backfill columns from + /// (i.e synced peers). + network_globals: Arc>, +} + +impl CustodyBackFillSync { + pub fn new( + beacon_chain: Arc>, + network_globals: Arc>, + ) -> Self { + Self { + current_start: Epoch::new(0), + processing_target: Epoch::new(0), + cgc: 0, + run_id: 0, + to_be_downloaded: Epoch::new(0), + last_batch_downloaded: false, + batches: BTreeMap::new(), + skipped_batches: HashSet::new(), + current_processing_batch: None, + validated_batches: 0, + restart_failed_sync: false, + beacon_chain, + network_globals, + } + } + + /// Pauses the custody sync if it's currently syncing. + pub fn pause(&mut self, reason: String) { + if let CustodyBackFillState::Syncing = self.state() { + debug!(processed_epochs = %self.validated_batches, to_be_processed = %self.current_start,"Custody backfill sync paused"); + self.set_state(CustodyBackFillState::Pending(reason)); + } + } + + /// Checks if custody backfill sync should start and sets the missing columns + /// custody backfill sync will attempt to fetch. + /// The criteria to start custody sync is: + /// - The earliest data column epoch's custodied columns != previous epoch's custodied columns + /// - The earliest data column epoch is a finalied epoch + pub fn should_start_custody_backfill_sync(&mut self) -> bool { + let Some(da_boundary_epoch) = self.beacon_chain.get_column_da_boundary() else { + return false; + }; + + // This is the epoch in which we have met our current custody requirements + let Some(earliest_data_column_epoch) = + self.beacon_chain.earliest_custodied_data_column_epoch() + else { + return false; + }; + + // Check if we have missing columns between the da boundary and `earliest_data_column_epoch` + let missing_columns = self + .beacon_chain + .get_missing_columns_for_epoch(da_boundary_epoch); + + if !missing_columns.is_empty() { + let latest_finalized_epoch = self + .beacon_chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch; + + // Check that the earliest data column epoch is a finalized epoch. + return earliest_data_column_epoch <= latest_finalized_epoch; + } + + false + } + + fn restart_sync(&mut self) { + // Set state to paused + self.set_state(CustodyBackFillState::Pending( + "CGC count has changed and custody backfill sync needs to restart".to_string(), + )); + + // Remove all batches and active requests. + self.batches.clear(); + self.skipped_batches.clear(); + self.restart_failed_sync = false; + + // Reset all downloading and processing targets + // NOTE: Lets keep validated_batches for posterity + self.processing_target = Epoch::new(0); + self.to_be_downloaded = Epoch::new(0); + self.last_batch_downloaded = false; + self.current_processing_batch = None; + self.validated_batches = 0; + self.run_id += 1; + + self.set_start_epoch(); + self.set_cgc(); + } + + fn restart_if_required(&mut self) -> bool { + let cgc_at_head = self + .beacon_chain + .data_availability_checker + .custody_context() + .custody_group_count_at_head(&self.beacon_chain.spec); + + if cgc_at_head != self.cgc { + self.restart_sync(); + return true; + } + + false + } + + /// Starts syncing. + #[must_use = "A failure here indicates custody backfill sync has failed and the global sync state should be updated"] + pub fn start( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + match self.state() { + CustodyBackFillState::Syncing => { + if self.restart_if_required() { + return Ok(SyncStart::NotSyncing); + } + + if self.check_completed() { + self.set_state(CustodyBackFillState::Completed); + return Ok(SyncStart::NotSyncing); + } + } + CustodyBackFillState::Pending(_) | CustodyBackFillState::Completed => { + if self.check_completed() { + self.set_state(CustodyBackFillState::Completed); + return Ok(SyncStart::NotSyncing); + } + self.set_cgc(); + + if !self.should_start_custody_backfill_sync() { + return Ok(SyncStart::NotSyncing); + } + self.set_start_epoch(); + if self + .network_globals + .peers + .read() + .synced_peers() + .next() + .is_some() + { + debug!( + run_id = self.run_id, + current_start = %self.current_start, + processing_target = %self.processing_target, + to_be_downloaded = %self.to_be_downloaded, + "Starting custody backfill sync" + ); + // If there are peers to resume with, begin the resume. + self.set_state(CustodyBackFillState::Syncing); + // Resume any previously failed batches. + self.resume_batches(network)?; + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + + // start processing batches if needed + self.process_completed_batches(network)?; + } else { + return Ok(SyncStart::NotSyncing); + } + } + } + + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return Ok(SyncStart::NotSyncing); + }; + + Ok(SyncStart::Syncing { + completed: (self.validated_batches + * CUSTODY_BACKFILL_EPOCHS_PER_BATCH + * T::EthSpec::slots_per_epoch()) as usize, + remaining: self + .current_start + .end_slot(T::EthSpec::slots_per_epoch()) + .saturating_sub(column_da_boundary.start_slot(T::EthSpec::slots_per_epoch())) + .as_usize(), + }) + } + + fn set_cgc(&mut self) { + self.cgc = self + .beacon_chain + .data_availability_checker + .custody_context() + .custody_group_count_at_head(&self.beacon_chain.spec); + } + + fn set_start_epoch(&mut self) { + let earliest_data_column_epoch = self + .beacon_chain + .earliest_custodied_data_column_epoch() + .unwrap_or(Epoch::new(0)); + + self.current_start = earliest_data_column_epoch + 1; + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + } + + /// Attempts to request the next required batches from the peer pool. It will exhaust the peer + /// pool and left over batches until the batch buffer is reached or all peers are exhausted. + fn request_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result<(), CustodyBackfillError> { + if !matches!(self.state(), CustodyBackFillState::Syncing) { + return Ok(()); + } + + // find the next pending batch and request it from the peer + // Note: for this function to not infinite loop we must: + // - If `include_next_batch` returns Some we MUST increase the count of batches that are + // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of + // that function. + while let Some(batch_id) = self.include_next_batch() { + // send the batch + self.send_batch(network, batch_id)?; + } + + // No more batches, simply stop + Ok(()) + } + + /// When resuming a chain, this function searches for batches that need to be re-downloaded and + /// transitions their state to redownload the batch. + fn resume_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result<(), CustodyBackfillError> { + let batch_ids_to_retry = self + .batches + .iter() + .filter_map(|(batch_id, batch)| { + // In principle there should only ever be on of these, and we could terminate the + // loop early, however the processing is negligible and we continue the search + // for robustness to handle potential future modification + if matches!(batch.state(), BatchState::AwaitingDownload) { + Some(*batch_id) + } else { + None + } + }) + .collect::>(); + + for batch_id in batch_ids_to_retry { + self.send_batch(network, batch_id)?; + } + Ok(()) + } + + /// Creates the next required batch from the chain. If there are no more batches required, + /// `None` is returned. + fn include_next_batch(&mut self) -> Option { + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return None; + }; + + let mut missing_columns = HashSet::new(); + + // Skip all batches (Epochs) that don't have missing columns. + for epoch in Epoch::range_inclusive_rev(self.to_be_downloaded, column_da_boundary) { + missing_columns = self.beacon_chain.get_missing_columns_for_epoch(epoch); + + if !missing_columns.is_empty() { + self.to_be_downloaded = epoch; + break; + } + + // This batch is being skipped, insert it into the skipped batches mapping. + self.skipped_batches.insert(epoch); + + if epoch == column_da_boundary { + return None; + } + } + + // Don't request batches before the column da boundary + if self.to_be_downloaded < column_da_boundary { + return None; + } + + // Don't request batches beyond the DA window + if self.last_batch_downloaded { + return None; + } + + // Only request batches up to the buffer size limit + // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync + // if the current processing window is contained in a long range of skip slots. + let in_buffer = |batch: &CustodyBackFillBatchInfo| { + matches!( + batch.state(), + BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) + ) + }; + if self + .batches + .iter() + .filter(|&(_epoch, batch)| in_buffer(batch)) + .count() + > BACKFILL_BATCH_BUFFER_SIZE as usize + { + return None; + } + + let batch_id = self.to_be_downloaded; + + match self.batches.entry(batch_id) { + Entry::Occupied(_) => { + // this batch doesn't need downloading, let this same function decide the next batch + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(CUSTODY_BACKFILL_EPOCHS_PER_BATCH); + self.include_next_batch() + } + Entry::Vacant(entry) => { + entry.insert(BatchInfo::new( + &batch_id, + CUSTODY_BACKFILL_EPOCHS_PER_BATCH, + ByRangeRequestType::Columns(missing_columns), + )); + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(CUSTODY_BACKFILL_EPOCHS_PER_BATCH); + Some(batch_id) + } + } + } + + /// Processes the batch with the given id. + /// The batch must exist and be ready for processing + fn process_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result { + // Check if we need to restart custody backfill sync due to a recent cgc change + if self.restart_if_required() { + return Ok(ProcessResult::Successful); + } + + if self.state() != CustodyBackFillState::Syncing || self.current_processing_batch.is_some() + { + return Ok(ProcessResult::Successful); + } + + let Some(batch) = self.batches.get_mut(&batch_id) else { + return self + .fail_sync(CustodyBackfillError::InvalidSyncState(format!( + "Trying to process a batch that does not exist: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + }; + + let (data_columns, _) = match batch.start_processing() { + Err(e) => { + return self + .fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0)) + .map(|_| ProcessResult::Successful); + } + Ok(v) => v, + }; + + self.current_processing_batch = Some(batch_id); + + if let Err(e) = network.beacon_processor().send_historic_data_columns( + CustodyBackfillBatchId { + epoch: batch_id, + run_id: self.run_id, + }, + data_columns, + ) { + crit!( + msg = "process_batch", + error = %e, + batch = ?self.processing_target, + "Failed to send data columns to processor." + ); + // This is unlikely to happen but it would stall syncing since the batch now has no + // data columns to continue, and the chain is expecting a processing result that won't + // arrive. To mitigate this, (fake) fail this processing so that the batch is + // re-downloaded. + self.on_batch_process_result( + network, + CustodyBackfillBatchId { + epoch: batch_id, + run_id: self.run_id, + }, + &CustodyBatchProcessResult::Error { peer_action: None }, + ) + } else { + Ok(ProcessResult::Successful) + } + } + + /// A data column has been received for a batch. + /// If the column correctly completes the batch it will be processed if possible. + /// If this returns an error, custody sync has failed and will be restarted once new peers + /// join the system. + /// The sync manager should update the global sync state on failure. + #[must_use = "A failure here indicates custody backfill sync has failed and the global sync state should be updated"] + pub fn on_data_column_response( + &mut self, + network: &mut SyncNetworkContext, + req_id: CustodyBackFillBatchRequestId, + peer_id: &PeerId, + resp: Result, RpcResponseError>, + ) -> Result { + if req_id.batch_id.run_id != self.run_id { + debug!(%req_id, "Ignoring custody backfill download response from different run_id"); + return Ok(ProcessResult::Successful); + } + + let batch_id = req_id.batch_id.epoch; + // check if we have this batch + let Some(batch) = self.batches.get_mut(&batch_id) else { + if !matches!(self.state(), CustodyBackFillState::Pending(_)) { + // A batch might get removed when custody sync advances, so this is non fatal. + debug!(epoch = %batch_id, "Received a column for unknown batch"); + } + return Ok(ProcessResult::Successful); + }; + + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed for other + // reasons. Check that this column belongs to the expected peer, and that the + // request_id matches + if !batch.is_expecting_request_id(&req_id.id) { + return Ok(ProcessResult::Successful); + } + + match resp { + Ok(data_columns) => { + let received = data_columns.len(); + + match batch.download_completed(data_columns, *peer_id) { + Ok(_) => { + let awaiting_batches = self.processing_target.saturating_sub(batch_id) + / CUSTODY_BACKFILL_EPOCHS_PER_BATCH; + debug!( + %req_id, + blocks = received, + %awaiting_batches, + "Completed batch received" + ); + + // pre-emptively request more columns from peers whilst we process current columns. + self.request_batches(network)?; + self.process_completed_batches(network) + } + Err(e) => { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + Ok(ProcessResult::Successful) + } + } + } + Err(err) => { + debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed"); + + // If there are any coupling errors, penalize the appropriate peers + if let RpcResponseError::BlockComponentCouplingError(coupling_error) = err + && let CouplingError::DataColumnPeerFailure { + error, + faulty_peers, + exceeded_retries: _, + } = coupling_error + { + for (column_index, faulty_peer) in faulty_peers { + debug!( + ?error, + ?column_index, + ?faulty_peer, + "Custody backfill sync penalizing peer" + ); + network.report_peer( + faulty_peer, + PeerAction::LowToleranceError, + "Peer failed to serve column", + ); + } + } + + match batch.download_failed(Some(*peer_id)) { + Err(e) => { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(CustodyBackfillError::BatchDownloadFailed(batch_id))?; + } + Ok(BatchOperationOutcome::Continue) => { + self.send_batch(network, batch_id)?; + } + } + Ok(ProcessResult::Successful) + } + } + } + + /// The beacon processor has completed processing a batch. This function handles the result + /// of the batch processor. + /// If an error is returned custody backfill sync has failed. + #[must_use = "A failure here indicates custody backfill sync has failed and the global sync state should be updated"] + pub fn on_batch_process_result( + &mut self, + network: &mut SyncNetworkContext, + custody_batch_id: CustodyBackfillBatchId, + result: &CustodyBatchProcessResult, + ) -> Result { + let batch_id = custody_batch_id.epoch; + if custody_batch_id.run_id != self.run_id { + debug!(batch = %custody_batch_id, "Ignoring custody backfill error from different run_id"); + return Ok(ProcessResult::Successful); + } + + // The first two cases are possible in regular sync, should not occur in custody backfill, but we + // keep this logic for handling potential processing race conditions. + // result + let batch = match &self.current_processing_batch { + Some(processing_id) if *processing_id != batch_id => { + debug!( + batch_epoch = %batch_id, + expected_batch_epoch = processing_id.as_u64(), + "Unexpected batch result" + ); + return Ok(ProcessResult::Successful); + } + None => { + debug!(%batch_id, "Chain was not expecting a batch result"); + return Ok(ProcessResult::Successful); + } + _ => { + // batch_id matches, continue + self.current_processing_batch = None; + + match self.batches.get_mut(&batch_id) { + Some(batch) => batch, + None => { + // This is an error. Fail the sync algorithm. + return self + .fail_sync(CustodyBackfillError::InvalidSyncState(format!( + "Current processing batch not found: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + } + } + } + }; + + let Some(peer) = batch.processing_peer() else { + self.fail_sync(CustodyBackfillError::BatchInvalidState( + batch_id, + String::from("Peer does not exist"), + ))?; + return Ok(ProcessResult::Successful); + }; + + debug!( + ?result, + batch_id = %custody_batch_id, + %peer, + client = %network.client_type(peer), + "Custody backfill batch processed" + ); + + match result { + CustodyBatchProcessResult::Success { + imported_columns, .. + } => { + if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + } + + debug!(imported_count=?imported_columns, "Succesfully imported historical data columns"); + + self.advance_custody_backfill_sync(batch_id); + + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return Err(CustodyBackfillError::InvalidSyncState( + "Can't calculate column data availability boundary".to_string(), + )); + }; + + if batch_id == self.processing_target { + // Advance processing target to the previous epoch + // If the current processing target is above the column DA boundary + if self.processing_target > column_da_boundary { + self.processing_target = self + .processing_target + .saturating_sub(CUSTODY_BACKFILL_EPOCHS_PER_BATCH); + } + } + + // check if custody sync has completed syncing up to the DA window + if self.check_completed() { + info!( + validated_epochs = ?self.validated_batches, + run_id = self.run_id, + "Custody backfill sync completed" + ); + self.batches.clear(); + self.restart_failed_sync = false; + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + self.last_batch_downloaded = false; + self.current_processing_batch = None; + self.validated_batches = 0; + self.skipped_batches.clear(); + self.set_state(CustodyBackFillState::Completed); + self.beacon_chain.update_data_column_custody_info(None); + Ok(ProcessResult::SyncCompleted) + } else { + // custody sync is not completed + // attempt to request more batches + self.request_batches(network)?; + // attempt to process more batches + self.process_completed_batches(network) + } + } + CustodyBatchProcessResult::Error { peer_action } => { + match peer_action { + // Faulty failure + Some(peer_action) => { + match batch.processing_completed(BatchProcessingResult::FaultyFailure) { + Err(e) => { + // Batch was in the wrong state + self.fail_sync(CustodyBackfillError::BatchInvalidState( + batch_id, e.0, + )) + .map(|_| ProcessResult::Successful) + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + warn!( + score_adjustment = ?peer_action, + batch_epoch = %batch_id, + "Custody backfill batch failed to download. Penalizing peers" + ); + self.fail_sync(CustodyBackfillError::BatchProcessingFailed( + batch_id, + )) + .map(|_| ProcessResult::Successful) + } + + Ok(BatchOperationOutcome::Continue) => { + self.advance_custody_backfill_sync(batch_id); + // Handle this invalid batch, that is within the re-process retries limit. + self.handle_invalid_batch(network, batch_id) + .map(|_| ProcessResult::Successful) + } + } + } + // Non faulty failure + None => { + if let Err(e) = + batch.processing_completed(BatchProcessingResult::NonFaultyFailure) + { + self.fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0))?; + } + self.send_batch(network, batch_id)?; + Ok(ProcessResult::Successful) + } + } + } + } + } + + /// Processes the next ready batch. + fn process_completed_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + // Only process batches if custody backfill is syncing and only process one batch at a time + if self.state() != CustodyBackFillState::Syncing || self.current_processing_batch.is_some() + { + return Ok(ProcessResult::Successful); + } + + // Don't try to process batches before the Fulu fork epoch since data columns don't exist + if let Some(fulu_fork_epoch) = self.beacon_chain.spec.fulu_fork_epoch + && self.processing_target < fulu_fork_epoch + { + return Ok(ProcessResult::Successful); + } + + // Check if we need to restart custody backfill sync due to a cgc change. + if self.restart_if_required() { + return Ok(ProcessResult::Successful); + } + + while self.skipped_batches.contains(&self.processing_target) { + self.skipped_batches.remove(&self.processing_target); + // Update data column custody info with the skipped batch + if let Err(e) = self + .beacon_chain + .safely_backfill_data_column_custody_info(self.processing_target) + { + // I can't see a scenario where this could happen, but if we don't + // handle this edge case custody backfill sync could be stuck indefinitely. + error!( + error=?e, + "Unable to update data column custody info, restarting sync" + ); + self.restart_sync(); + }; + self.processing_target -= BACKFILL_EPOCHS_PER_BATCH; + } + + // Find the id of the batch we are going to process. + if let Some(batch) = self.batches.get(&self.processing_target) { + let state = batch.state(); + match state { + BatchState::AwaitingProcessing(..) => { + return self.process_batch(network, self.processing_target); + } + BatchState::Downloading(..) => { + // Batch is not ready, nothing to process + } + // Batches can be in `AwaitingDownload` state if there weren't good data column subnet + // peers to send the request to. + BatchState::AwaitingDownload => return Ok(ProcessResult::Successful), + BatchState::AwaitingValidation(..) => { + // The batch is validated + } + BatchState::Poisoned => unreachable!("Poisoned batch"), + BatchState::Failed | BatchState::Processing(_) => { + // these are all inconsistent states: + // - Failed -> non recoverable batch. Columns should have been removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - Processing -> `self.current_processing_batch` is None + self.fail_sync(CustodyBackfillError::InvalidSyncState(String::from( + "Invalid expected batch state", + )))?; + return Ok(ProcessResult::Successful); + } + } + } else { + self.fail_sync(CustodyBackfillError::InvalidSyncState(format!( + "Batch not found for current processing target {}", + self.processing_target + )))?; + return Ok(ProcessResult::Successful); + } + Ok(ProcessResult::Successful) + } + + /// Removes any batches previous to the given `validating_epoch` and advance custody backfill sync + /// to `validating_epoch`. + /// + /// The `validating_epoch` must align with batch boundaries. + fn advance_custody_backfill_sync(&mut self, validating_epoch: Epoch) { + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return; + }; + // make sure this epoch produces an advancement, unless its at the column DA boundary + if validating_epoch >= self.current_start && validating_epoch > column_da_boundary { + return; + } + + // We can now validate higher batches than the current batch. Here we remove all + // batches that are higher than the current batch. We add on an extra + // `BACKFILL_EPOCHS_PER_BATCH` as `split_off` is inclusive. + let removed_batches = self + .batches + .split_off(&(validating_epoch + CUSTODY_BACKFILL_EPOCHS_PER_BATCH)); + + for (id, batch) in removed_batches.into_iter() { + self.validated_batches = self.validated_batches.saturating_add(1); + match batch.state() { + BatchState::Downloading(..) | BatchState::AwaitingValidation(..) => {} + BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { + crit!("Batch indicates inconsistent data columns while advancing custody sync") + } + BatchState::AwaitingProcessing(..) => {} + BatchState::Processing(_) => { + debug!(batch = %id, %batch, "Advancing custody sync while processing a batch"); + if let Some(processing_id) = self.current_processing_batch + && id >= processing_id + { + self.current_processing_batch = None; + } + } + } + } + + self.processing_target = self.processing_target.min(validating_epoch); + self.current_start = self.current_start.min(validating_epoch); + self.to_be_downloaded = self.to_be_downloaded.min(validating_epoch); + + if self.batches.contains_key(&self.to_be_downloaded) { + // if custody backfill sync is advanced by Range beyond the previous `self.to_be_downloaded`, we + // won't have this batch, so we need to request it. + self.to_be_downloaded -= CUSTODY_BACKFILL_EPOCHS_PER_BATCH; + } + debug!(?validating_epoch, processing_target = ?self.processing_target, "Custody backfill advanced"); + } + + /// An invalid batch has been received that could not be processed, but that can be retried. + /// + /// These events occur when a peer has successfully responded with columns, but the columns + /// received are incorrect or invalid. This indicates the peer has not performed as + /// intended and can result in down voting a peer. + fn handle_invalid_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), CustodyBackfillError> { + // The current batch could not be processed, indicating either the current or previous + // batches are invalid. + + // The previous batch could be incomplete due to the columns being too large to fit in + // a single RPC request or there could be consecutive empty batches which are not supposed + // to be there + + // The current (sub-optimal) strategy is to simply re-request all batches that could + // potentially be faulty. If a batch returns a different result than the original and + // results in successful processing, we downvote the original peer that sent us the batch. + + // this is our robust `processing_target`. All previous batches must be awaiting + // validation + let mut redownload_queue = Vec::new(); + + for (id, _) in self.batches.iter_mut().filter(|&(&id, _)| id > batch_id) { + redownload_queue.push(*id); + } + + // no batch maxed out it process attempts, so now the chain's volatile progress must be + // reset + self.processing_target = self.current_start; + + for id in redownload_queue { + self.send_batch(network, id)?; + } + // finally, re-request the failed batch. + self.send_batch(network, batch_id) + } + + /// Checks with the beacon chain if custody sync has completed. + fn check_completed(&mut self) -> bool { + if self.would_complete(self.current_start) { + // Check that the data column custody info `earliest_available_slot` + // is in an epoch that is less than or equal to the current DA boundary + let Some(earliest_data_column_epoch) = + self.beacon_chain.earliest_custodied_data_column_epoch() + else { + return false; + }; + + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return false; + }; + + return earliest_data_column_epoch <= column_da_boundary; + } + false + } + + /// Checks if custody backfill would complete by syncing to `start_epoch`. + fn would_complete(&self, start_epoch: Epoch) -> bool { + let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + return false; + }; + start_epoch <= column_da_boundary + } + + /// Requests the batch assigned to the given id from a given peer. + fn send_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), CustodyBackfillError> { + let span = info_span!(SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST); + let _enter = span.enter(); + + if let Some(batch) = self.batches.get_mut(&batch_id) { + let synced_peers = self + .network_globals + .peers + .read() + .synced_peers_for_epoch(batch_id) + .cloned() + .collect::>(); + + let request = batch.to_data_columns_by_range_request().map_err(|_| { + CustodyBackfillError::InvalidSyncState( + "Can't convert to data column by range request".to_string(), + ) + })?; + let failed_peers = batch.failed_peers(); + + match network.custody_backfill_data_columns_batch_request( + request, + CustodyBackfillBatchId { + epoch: batch_id, + run_id: self.run_id, + }, + &synced_peers, + &failed_peers, + ) { + Ok(request_id) => { + // inform the batch about the new request + if let Err(e) = batch.start_downloading(request_id.id) { + return self + .fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0)); + } + debug!(epoch = %batch_id, %batch, "Requesting batch"); + + return Ok(()); + } + Err(e) => match e { + crate::sync::network_context::RpcRequestSendError::NoPeer(no_peer) => { + // If we are here we have no more synced peers + debug!( + "reason" = format!("insufficient_synced_peers({no_peer:?})"), + "Custody sync paused" + ); + self.pause("Insufficient peers".to_string()); + return Err(CustodyBackfillError::Paused); + } + crate::sync::network_context::RpcRequestSendError::InternalError(e) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(%batch_id, error = ?e, %batch,"Could not send batch request"); + // register the failed download and check if the batch can be retried + if let Err(e) = batch.start_downloading(1) { + return self + .fail_sync(CustodyBackfillError::BatchInvalidState(batch_id, e.0)); + } + + match batch.download_failed(None) { + Err(e) => self.fail_sync(CustodyBackfillError::BatchInvalidState( + batch_id, e.0, + ))?, + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(CustodyBackfillError::BatchDownloadFailed(batch_id))? + } + Ok(BatchOperationOutcome::Continue) => { + return self.send_batch(network, batch_id); + } + } + } + }, + } + } + + Ok(()) + } + + /// The syncing process has failed. + /// + /// This resets past variables, to allow for a fresh start when resuming. + fn fail_sync(&mut self, error: CustodyBackfillError) -> Result<(), CustodyBackfillError> { + // Some errors shouldn't cause failure. + if matches!(error, CustodyBackfillError::Paused) { + return Ok(()); + } + + // Set the state + self.pause("Sync has failed".to_string()); + // Remove all batches and active requests. + self.batches.clear(); + self.restart_failed_sync = false; + + // Reset all downloading and processing targets + // NOTE: Lets keep validated_batches for posterity + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + self.last_batch_downloaded = false; + self.current_processing_batch = None; + self.restart_sync(); + + Err(error) + } + + pub fn state(&self) -> CustodyBackFillState { + self.network_globals.custody_sync_state.read().clone() + } + + /// Updates the global network state indicating the current state of a backfill sync. + pub fn set_state(&self, state: CustodyBackFillState) { + *self.network_globals.custody_sync_state.write() = state; + } + + /// A fully synced peer has joined us. + /// If we are in a failed state, update a local variable to indicate we are able to restart + /// the failed sync on the next attempt. + pub fn fully_synced_peer_joined(&mut self) { + if matches!(self.state(), CustodyBackFillState::Pending(_)) { + self.restart_failed_sync = true; + } + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index d7ba028054..338f21ce98 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -46,7 +46,8 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; -use crate::sync::network_context::PeerGroup; +use crate::sync::custody_backfill_sync::CustodyBackFillSync; +use crate::sync::network_context::{PeerGroup, RpcResponseResult}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{ @@ -56,14 +57,16 @@ use futures::StreamExt; use lighthouse_network::SyncInfo; use lighthouse_network::rpc::RPCError; use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyRequester, - DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, - SingleLookupReqId, SyncRequestId, + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, + DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; use lru_cache::LRUTimeCache; +use slot_clock::SlotClock; use std::ops::Sub; use std::sync::Arc; use std::time::Duration; @@ -158,6 +161,12 @@ pub enum SyncMessage { result: BatchProcessResult, }, + /// A custody batch has been processed by the processor thread. + CustodyBatchProcessed { + batch_id: CustodyBackfillBatchId, + result: CustodyBatchProcessResult, + }, + /// Block processed BlockComponentProcessed { process_type: BlockProcessType, @@ -209,6 +218,19 @@ pub enum BatchProcessResult { NonFaultyFailure, } +/// The result of processing multiple data columns. +#[derive(Debug)] +pub enum CustodyBatchProcessResult { + /// The custody batch was completed successfully. It carries whether the sent batch contained data columns. + Success { + #[allow(dead_code)] + sent_columns: usize, + imported_columns: usize, + }, + /// The custody batch processing failed. + Error { peer_action: Option }, +} + /// The primary object for handling and driving all the current syncing logic. It maintains the /// current state of the syncing process, the number of useful peers, downloaded blocks and /// controls the logic behind both the long-range (batch) sync and the on-going potential parent @@ -229,6 +251,9 @@ pub struct SyncManager { /// Backfill syncing. backfill_sync: BackFillSync, + /// Custody syncing. + custody_backfill_sync: CustodyBackFillSync, + block_lookups: BlockLookups, /// debounce duplicated `UnknownBlockHashFromAttestation` for the same root peer tuple. A peer /// may forward us thousands of a attestations, each one triggering an individual event. Only @@ -288,7 +313,8 @@ impl SyncManager { fork_context.clone(), ), range_sync: RangeSync::new(beacon_chain.clone()), - backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals), + backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals.clone()), + custody_backfill_sync: CustodyBackFillSync::new(beacon_chain.clone(), network_globals), block_lookups: BlockLookups::new(), notified_unknown_roots: LRUTimeCache::new(Duration::from_secs( NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS, @@ -549,6 +575,7 @@ impl SyncManager { // inform the backfill sync that a new synced peer has joined us. if new_state.is_synced() { self.backfill_sync.fully_synced_peer_joined(); + self.custody_backfill_sync.fully_synced_peer_joined(); } } is_connected @@ -558,17 +585,18 @@ impl SyncManager { } } - /// Updates the global sync state, optionally instigating or pausing a backfill sync as well as + /// Updates the global sync state, optionally instigating or pausing a backfill or custody sync as well as /// logging any changes. /// /// The logic for which sync should be running is as follows: - /// - If there is a range-sync running (or required) pause any backfill and let range-sync + /// - If there is a range-sync running (or required) pause any backfill/custody sync and let range-sync /// complete. /// - If there is no current range sync, check for any requirement to backfill and either /// start/resume a backfill sync if required. The global state will be BackFillSync if a /// backfill sync is running. /// - If there is no range sync and no required backfill and we have synced up to the currently /// known peers, we consider ourselves synced. + /// - If there is no range sync and no required backfill we check if we need to execute a custody sync. fn update_sync_state(&mut self) { let new_state: SyncState = match self.range_sync.state() { Err(e) => { @@ -624,15 +652,51 @@ impl SyncManager { error!(error = ?e, "Backfill sync failed to start"); } } + + // If backfill is complete, check if we have a pending custody backfill to complete + let anchor_info = self.chain.store.get_anchor_info(); + if anchor_info.block_backfill_complete(self.chain.genesis_backfill_slot) { + match self.custody_backfill_sync.start(&mut self.network) { + Ok(SyncStart::Syncing { + completed, + remaining, + }) => { + sync_state = SyncState::CustodyBackFillSyncing { + completed, + remaining, + }; + } + Ok(SyncStart::NotSyncing) => {} // Ignore updating the state if custody sync state didn't start. + Err(e) => { + use crate::sync::custody_backfill_sync::CustodyBackfillError; + + match &e { + CustodyBackfillError::BatchDownloadFailed(_) + | CustodyBackfillError::BatchProcessingFailed(_) => { + debug!(error=?e, "Custody backfill batch processing or downloading failed"); + } + CustodyBackfillError::BatchInvalidState(_, reason) => { + error!(error=?e, reason, "Custody backfill sync failed due to invalid batch state") + } + CustodyBackfillError::InvalidSyncState(reason) => { + error!(error=?e, reason, "Custody backfill sync failed due to invalid sync state") + } + CustodyBackfillError::Paused => {} + } + } + } + } } // Return the sync state if backfilling is not required. sync_state } Some((RangeSyncType::Finalized, start_slot, target_slot)) => { - // If there is a backfill sync in progress pause it. + // Range sync is in progress. If there is a backfill or custody sync in progress pause it. #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); + self.custody_backfill_sync + .pause("Range sync in progress".to_string()); SyncState::SyncingFinalized { start_slot, @@ -640,9 +704,12 @@ impl SyncManager { } } Some((RangeSyncType::Head, start_slot, target_slot)) => { - // If there is a backfill sync in progress pause it. + // Range sync is in progress. If there is a backfill or custody backfill sync + // in progress pause it. #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); + self.custody_backfill_sync + .pause("Range sync in progress".to_string()); SyncState::SyncingHead { start_slot, @@ -662,7 +729,9 @@ impl SyncManager { if new_state.is_synced() && !matches!( old_state, - SyncState::Synced | SyncState::BackFillSyncing { .. } + SyncState::Synced + | SyncState::BackFillSyncing { .. } + | SyncState::CustodyBackFillSyncing { .. } ) { self.network.subscribe_core_topics(); @@ -693,6 +762,11 @@ impl SyncManager { let mut register_metrics_interval = tokio::time::interval(Duration::from_secs(5)); + // Trigger a sync state update every epoch. This helps check if we need to trigger a custody backfill sync. + let epoch_duration = + self.chain.slot_clock.slot_duration().as_secs() * T::EthSpec::slots_per_epoch(); + let mut epoch_interval = tokio::time::interval(Duration::from_secs(epoch_duration)); + // process any inbound messages loop { tokio::select! { @@ -711,6 +785,9 @@ impl SyncManager { _ = register_metrics_interval.tick() => { self.network.register_metrics(); } + _ = epoch_interval.tick() => { + self.update_sync_state(); + } } } } @@ -865,6 +942,21 @@ impl SyncManager { } } }, + SyncMessage::CustodyBatchProcessed { result, batch_id } => { + match self.custody_backfill_sync.on_batch_process_result( + &mut self.network, + batch_id, + &result, + ) { + Ok(ProcessResult::Successful) => {} + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Err(error) => { + error!(error = ?error, "Custody sync failed"); + // Update the global status + self.update_sync_state(); + } + } + } } } @@ -1081,11 +1173,13 @@ impl SyncManager { RpcEvent::from_chunk(data_column, seen_timestamp), ); } - SyncRequestId::DataColumnsByRange(id) => self.on_data_columns_by_range_response( - id, - peer_id, - RpcEvent::from_chunk(data_column, seen_timestamp), - ), + SyncRequestId::DataColumnsByRange(req_id) => { + self.on_data_columns_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(data_column, seen_timestamp), + ); + } _ => { crit!(%peer_id, "bad request id for data_column"); } @@ -1173,11 +1267,22 @@ impl SyncManager { .network .on_data_columns_by_range_response(id, peer_id, data_column) { - self.on_range_components_response( - id.parent_request_id, - peer_id, - RangeBlockComponent::CustodyColumns(id, resp), - ); + match id.parent_request_id { + DataColumnsByRangeRequester::ComponentsByRange(components_by_range_req_id) => { + self.on_range_components_response( + components_by_range_req_id, + peer_id, + RangeBlockComponent::CustodyColumns(id, resp), + ); + } + DataColumnsByRangeRequester::CustodyBackfillSync(custody_backfill_req_id) => self + .on_custody_backfill_columns_response( + custody_backfill_req_id, + id, + peer_id, + resp, + ), + } } } @@ -1267,6 +1372,36 @@ impl SyncManager { } } } + + /// Handles receiving a response for a custody range sync request that has columns. + fn on_custody_backfill_columns_response( + &mut self, + custody_sync_request_id: CustodyBackFillBatchRequestId, + req_id: DataColumnsByRangeRequestId, + peer_id: PeerId, + data_columns: RpcResponseResult>>>, + ) { + if let Some(resp) = self.network.custody_backfill_data_columns_response( + custody_sync_request_id, + req_id, + data_columns, + ) { + match self.custody_backfill_sync.on_data_column_response( + &mut self.network, + custody_sync_request_id, + &peer_id, + resp, + ) { + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Ok(ProcessResult::Successful) => {} + Err(_e) => { + // The custody sync has failed, errors are reported + // within. + self.update_sync_state(); + } + } + } + } } impl From> for BlockProcessingResult { diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 4dab2e17d3..054bab654c 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -2,14 +2,17 @@ //! //! Stores the various syncing methods for the beacon chain. mod backfill_sync; +mod batch; mod block_lookups; mod block_sidecar_coupling; +mod custody_backfill_sync; pub mod manager; mod network_context; mod peer_sync_info; +mod range_data_column_batch_request; mod range_sync; #[cfg(test)] mod tests; pub use manager::{BatchProcessResult, SyncMessage}; -pub use range_sync::{BatchOperationOutcome, ChainId}; +pub use range_sync::ChainId; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1d119cb2de..2e0c56db23 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -6,16 +6,17 @@ pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlock use super::SyncMessage; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; -use super::range_sync::ByRangeRequestType; use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; #[cfg(test)] use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; +use crate::sync::batch::ByRangeRequestType; use crate::sync::block_lookups::SingleLookupId; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; +use crate::sync::range_data_column_batch_request::RangeDataColumnBatchRequest; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; @@ -25,7 +26,8 @@ use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, Req pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, + CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; @@ -211,7 +213,6 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, - /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -219,6 +220,10 @@ pub struct SyncNetworkContext { components_by_range_requests: FnvHashMap>, + /// A batch of data columns by range request for custody sync + custody_backfill_data_column_batch_requests: + FnvHashMap>, + /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. execution_engine_state: EngineState, @@ -295,6 +300,7 @@ impl SyncNetworkContext { data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), + custody_backfill_data_column_batch_requests: FnvHashMap::default(), network_beacon_processor, chain, fork_context, @@ -324,6 +330,7 @@ impl SyncNetworkContext { custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, + custody_backfill_data_column_batch_requests: _, execution_engine_state: _, network_beacon_processor: _, chain: _, @@ -354,7 +361,6 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); - blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) @@ -421,6 +427,7 @@ impl SyncNetworkContext { custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, + custody_backfill_data_column_batch_requests: _, execution_engine_state: _, network_beacon_processor: _, chain: _, @@ -503,7 +510,7 @@ impl SyncNetworkContext { count: *request.count(), columns, }, - id, + DataColumnsByRangeRequester::ComponentsByRange(id), new_range_request_span!( self, "outgoing_columns_by_range_retry", @@ -638,7 +645,7 @@ impl SyncNetworkContext { count: *request.count(), columns, }, - id, + DataColumnsByRangeRequester::ComponentsByRange(id), new_range_request_span!( self, "outgoing_columns_by_range", @@ -1238,7 +1245,7 @@ impl SyncNetworkContext { &mut self, peer_id: PeerId, request: DataColumnsByRangeRequest, - parent_request_id: ComponentsByRangeRequestId, + parent_request_id: DataColumnsByRangeRequester, request_span: Span, ) -> Result<(DataColumnsByRangeRequestId, Vec), RpcRequestSendError> { let requested_columns = request.columns.clone(); @@ -1679,6 +1686,111 @@ impl SyncNetworkContext { }) } + /// data column by range requests sent by the custody sync algorithm + pub fn custody_backfill_data_columns_batch_request( + &mut self, + request: DataColumnsByRangeRequest, + batch_id: CustodyBackfillBatchId, + peers: &HashSet, + peers_to_deprioritize: &HashSet, + ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + // Attempt to find all required custody peers before sending any request or creating an ID + let columns_by_range_peers_to_request = { + let column_indexes = self + .chain + .sampling_columns_for_epoch(batch_id.epoch) + .iter() + .cloned() + .collect(); + + self.select_columns_by_range_peers_to_request( + &column_indexes, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + )? + }; + + // Create the overall `custody_by_range` request id + let id = CustodyBackFillBatchRequestId { + id: self.next_id(), + batch_id, + }; + + let result = columns_by_range_peers_to_request + .iter() + .filter_map(|(peer_id, _)| { + self.send_data_columns_by_range_request( + *peer_id, + request.clone(), + DataColumnsByRangeRequester::CustodyBackfillSync(id), + Span::none(), + ) + .ok() + }) + .collect::>(); + + let range_data_column_batch_request = + RangeDataColumnBatchRequest::new(result, self.chain.clone(), batch_id.epoch); + + self.custody_backfill_data_column_batch_requests + .insert(id, range_data_column_batch_request); + + Ok(id) + } + + /// Received a data columns by range response from a custody sync request which batches them. + pub fn custody_backfill_data_columns_response( + &mut self, + // Identifies the custody backfill request for all data columns on this epoch + custody_sync_request_id: CustodyBackFillBatchRequestId, + // Identifies a specific data_columns_by_range request for *some* columns in this epoch. We + // pass them separately as DataColumnsByRangeRequestId parent is an enum and would require + // matching again. + req_id: DataColumnsByRangeRequestId, + data_columns: RpcResponseResult>, + ) -> Option, RpcResponseError>> { + let Entry::Occupied(mut entry) = self + .custody_backfill_data_column_batch_requests + .entry(custody_sync_request_id) + else { + metrics::inc_counter_vec( + &metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, + &["range_data_columns"], + ); + return None; + }; + + if let Err(e) = { + let request = entry.get_mut(); + data_columns.and_then(|(data_columns, _)| { + request + .add_custody_columns(req_id, data_columns.clone()) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError( + e, + )) + }) + }) + } { + entry.remove(); + return Some(Err(e)); + } + + if let Some(data_column_result) = entry.get_mut().responses() { + if data_column_result.is_ok() { + // remove the entry only if it coupled successfully with + // no errors + entry.remove(); + } + // If the request is finished, dequeue everything + Some(data_column_result.map_err(RpcResponseError::BlockComponentCouplingError)) + } else { + None + } + } + pub(crate) fn register_metrics(&self) { for (id, count) in [ ("blocks_by_root", self.blocks_by_root_requests.len()), diff --git a/beacon_node/network/src/sync/range_data_column_batch_request.rs b/beacon_node/network/src/sync/range_data_column_batch_request.rs new file mode 100644 index 0000000000..542d99d97c --- /dev/null +++ b/beacon_node/network/src/sync/range_data_column_batch_request.rs @@ -0,0 +1,297 @@ +use std::collections::{HashMap, HashSet}; + +use crate::sync::block_sidecar_coupling::{ByRangeRequest, CouplingError}; +use crate::sync::network_context::MAX_COLUMN_RETRIES; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use itertools::Itertools; +use lighthouse_network::PeerId; +use lighthouse_network::service::api_types::DataColumnsByRangeRequestId; +use std::sync::Arc; +use types::{ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Slot}; + +pub struct RangeDataColumnBatchRequest { + requests: HashMap< + DataColumnsByRangeRequestId, + ByRangeRequest>, + >, + /// The column indices corresponding to the request + column_peers: HashMap>, + expected_custody_columns: HashSet, + attempt: usize, + beacon_chain: Arc>, + epoch: Epoch, +} + +impl RangeDataColumnBatchRequest { + pub fn new( + by_range_requests: Vec<(DataColumnsByRangeRequestId, Vec)>, + beacon_chain: Arc>, + epoch: Epoch, + ) -> Self { + let requests = by_range_requests + .clone() + .into_iter() + .map(|(req, _)| (req, ByRangeRequest::Active(req))) + .collect::>(); + + let column_peers = by_range_requests.clone().into_iter().collect(); + + let expected_custody_columns = by_range_requests + .into_iter() + .flat_map(|(_, column_indices)| column_indices) + .collect(); + + Self { + requests, + column_peers, + expected_custody_columns, + beacon_chain, + epoch, + attempt: 0, + } + } + + pub fn add_custody_columns( + &mut self, + req_id: DataColumnsByRangeRequestId, + columns: Vec>>, + ) -> Result<(), String> { + let req = self + .requests + .get_mut(&req_id) + .ok_or(format!("unknown data columns by range req_id {req_id}"))?; + req.finish(req_id, columns) + } + + pub fn responses( + &mut self, + ) -> Option, CouplingError>> { + let mut received_columns_for_slot: HashMap> = + HashMap::new(); + let mut column_to_peer_id: HashMap = HashMap::new(); + + for column in self + .requests + .values() + .filter_map(|req| req.to_finished()) + .flatten() + { + received_columns_for_slot + .entry(column.slot()) + .or_default() + .push(column.clone()); + } + + // Note: this assumes that only 1 peer is responsible for a column + // with a batch. + for (id, columns) in self.column_peers.iter() { + for column in columns { + column_to_peer_id.insert(*column, id.peer); + } + } + + // An "attempt" is complete here after we have received a response for all the + // requests we made. i.e. `req.to_finished()` returns Some for all requests. + self.attempt += 1; + + let resp = self.responses_with_custody_columns( + received_columns_for_slot, + column_to_peer_id, + &self.expected_custody_columns, + self.attempt, + ); + + if let Err(CouplingError::DataColumnPeerFailure { + error: _, + faulty_peers, + exceeded_retries: _, + }) = &resp + { + for (_, peer) in faulty_peers.iter() { + // find the req id associated with the peer and + // delete it from the entries as we are going to make + // a separate attempt for those components. + self.requests.retain(|&k, _| k.peer != *peer); + } + } + Some(resp) + } + + fn responses_with_custody_columns( + &self, + mut received_columns_for_slot: HashMap>, + column_to_peer: HashMap, + expected_custody_columns: &HashSet, + attempt: usize, + ) -> Result, CouplingError> { + let mut naughty_peers = vec![]; + let mut result: DataColumnSidecarList = vec![]; + + let forward_blocks_iter = self + .beacon_chain + .forwards_iter_block_roots_until( + self.epoch.start_slot(T::EthSpec::slots_per_epoch()), + self.epoch.end_slot(T::EthSpec::slots_per_epoch()), + ) + .map_err(|_| { + CouplingError::InternalError("Failed to fetch block root iterator".to_string()) + })?; + + for block_iter_result in forward_blocks_iter { + let (block_root, slot) = block_iter_result.map_err(|_| { + CouplingError::InternalError("Failed to iterate block roots".to_string()) + })?; + + let Some(block) = self + .beacon_chain + .get_blinded_block(&block_root) + .ok() + .flatten() + else { + // The block root we are fetching is from the forwards block root iterator. This doesn't seem like a possible scenario. + return Err(CouplingError::InternalError( + "Block root from forwards block iterator not found in db".to_string(), + )); + }; + + let Some(columns) = received_columns_for_slot.remove(&slot) else { + // If at least one blob is expected for this slot but none have been served, penalize all peers + // The slot check ensures we arent checking a skipped slot. + if block.num_expected_blobs() != 0 && block.slot() == slot { + for column in expected_custody_columns { + if let Some(naughty_peer) = column_to_peer.get(column) { + naughty_peers.push((*column, *naughty_peer)); + } + } + } + continue; + }; + + // This is a skipped slot, skip to the next slot after we verify that peers + // didn't serve us columns for a skipped slot + if block.slot() != slot { + // If we received columns for a skipped slot, punish the peer + if !columns.is_empty() { + for column in expected_custody_columns { + if let Some(naughty_peer) = column_to_peer.get(column) { + naughty_peers.push((*column, *naughty_peer)); + } + } + } + + continue; + } + + let column_block_roots = columns + .iter() + .map(|column| column.block_root()) + .unique() + .collect::>(); + + let column_block_signatures = columns + .iter() + .map(|column| column.signed_block_header.signature.clone()) + .unique() + .collect::>(); + + let column_block_root = match column_block_roots.as_slice() { + // We expect a single unique block root + [column_block_root] => *column_block_root, + // If there are no block roots, penalize all peers + [] => { + for column in &columns { + if let Some(naughty_peer) = column_to_peer.get(&column.index) { + naughty_peers.push((column.index, *naughty_peer)); + } + } + continue; + } + // If theres more than one unique block root penalize the peers serving the bad block roots. + column_block_roots => { + for column in columns { + if column_block_roots.contains(&column.block_root()) + && block_root != column.block_root() + && let Some(naughty_peer) = column_to_peer.get(&column.index) + { + naughty_peers.push((column.index, *naughty_peer)); + } + } + continue; + } + }; + + let column_block_signature = match column_block_signatures.as_slice() { + // We expect a single unique block signature + [block_signature] => block_signature, + // If there are no block signatures, penalize all peers + [] => { + for column in &columns { + if let Some(naughty_peer) = column_to_peer.get(&column.index) { + naughty_peers.push((column.index, *naughty_peer)); + } + } + continue; + } + // If theres more than one unique block signature, penalize the peers serving the + // invalid block signatures. + column_block_signatures => { + for column in columns { + if column_block_signatures.contains(&column.signed_block_header.signature) + && block.signature() != &column.signed_block_header.signature + && let Some(naughty_peer) = column_to_peer.get(&column.index) + { + naughty_peers.push((column.index, *naughty_peer)); + } + } + continue; + } + }; + + // if the block root doesn't match the columns block root, penalize the peers + if block_root != column_block_root { + for column in &columns { + if let Some(naughty_peer) = column_to_peer.get(&column.index) { + naughty_peers.push((column.index, *naughty_peer)); + } + } + } + + // If the block signature doesn't match the columns block signature, penalize the peers + if block.signature() != column_block_signature { + for column in &columns { + if let Some(naughty_peer) = column_to_peer.get(&column.index) { + naughty_peers.push((column.index, *naughty_peer)); + } + } + } + + let received_columns = columns.iter().map(|c| c.index).collect::>(); + + let missing_columns = received_columns + .difference(expected_custody_columns) + .collect::>(); + + // blobs are expected for this slot but there is at least one missing columns + // penalize the peers responsible for those columns. + if block.num_expected_blobs() != 0 && !missing_columns.is_empty() { + for column in missing_columns { + if let Some(naughty_peer) = column_to_peer.get(column) { + naughty_peers.push((*column, *naughty_peer)); + }; + } + } + + result.extend(columns); + } + + if !naughty_peers.is_empty() { + return Err(CouplingError::DataColumnPeerFailure { + error: "Bad or missing columns for some slots".to_string(), + faulty_peers: naughty_peers, + exceeded_retries: attempt >= MAX_COLUMN_RETRIES, + }); + } + + Ok(result) + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index ab5b8bee5e..014d728ffe 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,10 +1,13 @@ use super::RangeSyncType; -use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::batch::BatchId; +use crate::sync::batch::{ + BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, +}; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; -use crate::sync::{BatchOperationOutcome, BatchProcessResult, network_context::SyncNetworkContext}; +use crate::sync::{BatchProcessResult, network_context::SyncNetworkContext}; use beacon_chain::BeaconChainTypes; use beacon_chain::block_verification_types::RpcBlock; use lighthouse_network::service::api_types::Id; @@ -12,6 +15,8 @@ use lighthouse_network::{PeerAction, PeerId}; use lighthouse_tracing::SPAN_SYNCING_CHAIN; use logging::crit; use std::collections::{BTreeMap, HashSet, btree_map::Entry}; +use std::hash::{Hash, Hasher}; +use std::marker::PhantomData; use strum::IntoStaticStr; use tracing::{Span, debug, instrument, warn}; use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot}; @@ -35,6 +40,35 @@ const BATCH_BUFFER_SIZE: u8 = 5; /// and continued is now in an inconsistent state. pub type ProcessingResult = Result; +type RpcBlocks = Vec>; +type RangeSyncBatchInfo = BatchInfo, RpcBlocks>; +type RangeSyncBatches = BTreeMap>; + +/// The number of times to retry a batch before it is considered failed. +const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; + +/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed +/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. +const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; + +pub struct RangeSyncBatchConfig { + marker: PhantomData, +} + +impl BatchConfig for RangeSyncBatchConfig { + fn max_batch_download_attempts() -> u8 { + MAX_BATCH_DOWNLOAD_ATTEMPTS + } + fn max_batch_processing_attempts() -> u8 { + MAX_BATCH_PROCESSING_ATTEMPTS + } + fn batch_attempt_hash(data: &D) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + data.hash(&mut hasher); + hasher.finish() + } +} + /// Reasons for removing a chain #[derive(Debug)] #[allow(dead_code)] @@ -55,7 +89,6 @@ pub struct KeepChain; /// A chain identifier pub type ChainId = Id; -pub type BatchId = Epoch; #[derive(Debug, Copy, Clone, IntoStaticStr)] pub enum SyncingChainType { @@ -85,7 +118,7 @@ pub struct SyncingChain { pub target_head_root: Hash256, /// Sorted map of batches undergoing some kind of processing. - batches: BTreeMap>, + batches: RangeSyncBatches, /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain /// and thus available to download this chain from, as well as the batches we are currently @@ -249,7 +282,7 @@ impl SyncingChain { // request_id matches // TODO(das): removed peer_id matching as the node may request a different peer for data // columns. - if !batch.is_expecting_block(&request_id) { + if !batch.is_expecting_request_id(&request_id) { return Ok(KeepChain); } batch @@ -260,7 +293,8 @@ impl SyncingChain { // Remove the request from the peer's active batches // TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258 - let received = batch.download_completed(blocks, *peer_id)?; + let received = blocks.len(); + batch.download_completed(blocks, *peer_id)?; let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) / EPOCHS_PER_BATCH; @@ -918,7 +952,7 @@ impl SyncingChain { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer - if !batch.is_expecting_block(&request_id) { + if !batch.is_expecting_request_id(&request_id) { debug!( batch_epoch = %batch_id, batch_state = ?batch.state(), @@ -1233,7 +1267,7 @@ impl SyncingChain { // only request batches up to the buffer size limit // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync // if the current processing window is contained in a long range of skip slots. - let in_buffer = |batch: &BatchInfo| { + let in_buffer = |batch: &RangeSyncBatchInfo| { matches!( batch.state(), BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) @@ -1320,7 +1354,7 @@ impl SyncingChain { } } -use super::batch::WrongState as WrongBatchState; +use crate::sync::batch::WrongState as WrongBatchState; impl From for RemoveChain { fn from(err: WrongBatchState) -> Self { RemoveChain::WrongBatchState(err.0) diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 8f881fba90..dd9f17bfd1 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -1,17 +1,11 @@ //! This provides the logic for syncing a chain when the local node is far behind it's current //! peers. - -mod batch; mod chain; mod chain_collection; mod range; mod sync_type; -pub use batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, - ByRangeRequestType, -}; -pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; +pub use chain::{ChainId, EPOCHS_PER_BATCH}; #[cfg(test)] pub use chain_collection::SyncChainStatus; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 465edd3697..c9656ad1d0 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -39,12 +39,13 @@ //! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially //! and further batches are requested as current blocks are being processed. -use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; +use super::chain::{ChainId, RemoveChain, SyncingChain}; use super::chain_collection::{ChainCollection, SyncChainStatus}; use super::sync_type::RangeSyncType; use crate::metrics; use crate::status::ToStatusMessage; use crate::sync::BatchProcessResult; +use crate::sync::batch::BatchId; use crate::sync::network_context::{RpcResponseError, SyncNetworkContext}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index d58cf2e731..895afa4f33 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -949,6 +949,19 @@ impl, Cold: ItemStore> HotColdDB )); } + pub fn data_column_as_kv_store_ops( + &self, + block_root: &Hash256, + data_column: Arc>, + ops: &mut Vec, + ) { + ops.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconDataColumn, + get_data_column_key(block_root, &data_column.index), + data_column.as_ssz_bytes(), + )); + } + pub fn put_data_column_custody_info( &self, earliest_data_column_slot: Option, diff --git a/common/eth2/src/lighthouse/sync_state.rs b/common/eth2/src/lighthouse/sync_state.rs index 0327f7073f..9f6f3b52e0 100644 --- a/common/eth2/src/lighthouse/sync_state.rs +++ b/common/eth2/src/lighthouse/sync_state.rs @@ -15,6 +15,10 @@ pub enum SyncState { /// specified by its peers. Once completed, the node enters this sync state and attempts to /// download all required historical blocks. BackFillSyncing { completed: usize, remaining: usize }, + /// The node is undertaking a custody backfill sync. This occurs for a node that has completed forward and + /// backfill sync and has undergone a custody count change. During custody backfill sync the node attempts + /// to backfill its new column custody requirements up to the data availability window. + CustodyBackFillSyncing { completed: usize, remaining: usize }, /// The node has completed syncing a finalized chain and is in the process of re-evaluating /// which sync state to progress to. SyncTransition, @@ -39,6 +43,17 @@ pub enum BackFillState { Failed, } +#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +/// The state of the custody backfill sync. +pub enum CustodyBackFillState { + /// We are currently backfilling custody columns. + Syncing, + /// A custody backfill sync has completed. + Completed, + /// A custody sync should is set to Pending for various reasons. + Pending(String), +} + impl PartialEq for SyncState { fn eq(&self, other: &Self) -> bool { matches!( @@ -54,6 +69,10 @@ impl PartialEq for SyncState { SyncState::BackFillSyncing { .. }, SyncState::BackFillSyncing { .. } ) + | ( + SyncState::CustodyBackFillSyncing { .. }, + SyncState::CustodyBackFillSyncing { .. } + ) ) } } @@ -65,8 +84,8 @@ impl SyncState { SyncState::SyncingFinalized { .. } => true, SyncState::SyncingHead { .. } => true, SyncState::SyncTransition => true, - // Backfill doesn't effect any logic, we consider this state, not syncing. - SyncState::BackFillSyncing { .. } => false, + // Both backfill and custody backfill don't effect any logic, we consider this state, not syncing. + SyncState::BackFillSyncing { .. } | SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -77,7 +96,7 @@ impl SyncState { SyncState::SyncingFinalized { .. } => true, SyncState::SyncingHead { .. } => false, SyncState::SyncTransition => false, - SyncState::BackFillSyncing { .. } => false, + SyncState::BackFillSyncing { .. } | SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -87,7 +106,12 @@ impl SyncState { /// /// NOTE: We consider the node synced if it is fetching old historical blocks. pub fn is_synced(&self) -> bool { - matches!(self, SyncState::Synced | SyncState::BackFillSyncing { .. }) + matches!( + self, + SyncState::Synced + | SyncState::BackFillSyncing { .. } + | SyncState::CustodyBackFillSyncing { .. } + ) } /// Returns true if the node is *stalled*, i.e. has no synced peers. @@ -108,6 +132,9 @@ impl std::fmt::Display for SyncState { SyncState::Stalled => write!(f, "Stalled"), SyncState::SyncTransition => write!(f, "Evaluating known peers"), SyncState::BackFillSyncing { .. } => write!(f, "Syncing Historical Blocks"), + SyncState::CustodyBackFillSyncing { .. } => { + write!(f, "Syncing Historical Data Columns") + } } } } diff --git a/consensus/types/src/slot_epoch.rs b/consensus/types/src/slot_epoch.rs index 857044f981..05af9c5232 100644 --- a/consensus/types/src/slot_epoch.rs +++ b/consensus/types/src/slot_epoch.rs @@ -33,6 +33,13 @@ pub struct Slot(#[serde(with = "serde_utils::quoted_u64")] u64); #[serde(transparent)] pub struct Epoch(#[serde(with = "serde_utils::quoted_u64")] u64); +impl Epoch { + /// Returns an iterator `(end..=start)` + pub fn range_inclusive_rev(start: Self, end: Self) -> impl Iterator { + (end.0..=start.0).rev().map(Epoch) + } +} + impl_common!(Slot); impl_common!(Epoch); diff --git a/scripts/tests/checkpoint-sync.sh b/scripts/tests/checkpoint-sync.sh index df03da042e..605dc504f5 100755 --- a/scripts/tests/checkpoint-sync.sh +++ b/scripts/tests/checkpoint-sync.sh @@ -102,7 +102,8 @@ node_completed["fullnode"]=false echo "Polling sync status until backfill reaches ${TARGET_BACKFILL_SLOTS} slots or timeout of ${TIMEOUT_MINS} mins" -while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode]}" = false ]; do +# while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode]}" = false ]; do +while [ "${node_completed[fullnode]}" = false ]; do current_time=$(date +%s) elapsed=$((current_time - start_time)) @@ -112,7 +113,8 @@ while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode fi # Poll each node that hasn't completed yet - for node in "supernode" "fullnode"; do + # for node in "supernode" "fullnode"; do + for node in "fullnode"; do if [ "${node_completed[$node]}" = false ]; then poll_node "$node" fi @@ -121,7 +123,7 @@ while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode sleep $POLL_INTERVAL_SECS done -echo "Sync test complete! Both supernode and fullnode have synced to HEAD and backfilled ${TARGET_BACKFILL_SLOTS} slots." -echo "Supernode time: $((node_complete_time[supernode] - start_time)) seconds" +echo "Sync test complete! Fullnode has synced to HEAD and backfilled ${TARGET_BACKFILL_SLOTS} slots." +# echo "Supernode time: $((node_complete_time[supernode] - start_time)) seconds" echo "Fullnode time: $((node_complete_time[fullnode] - start_time)) seconds" exit_and_dump_logs 0 \ No newline at end of file