diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 13b8411695..76e5d031aa 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -99,6 +99,7 @@ jobs: --platform=linux/${SHORT_ARCH} \ --file ./Dockerfile.cross . \ --tag ${IMAGE_NAME}:${VERSION}-${SHORT_ARCH}${VERSION_SUFFIX}${MODERNITY_SUFFIX} \ + --provenance=false \ --push build-docker-multiarch: name: build-docker-multiarch${{ matrix.modernity }} diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index ccade998e5..57fee71830 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -377,7 +377,7 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Install cargo-udeps - run: cargo install cargo-udeps --locked --force --version $CARGO_UDEPS_VERSION + run: cargo install cargo-udeps --locked --force - name: Create Cargo config dir run: mkdir -p .cargo - name: Install custom Cargo config diff --git a/Cargo.lock b/Cargo.lock index 7756a487b7..bb77428153 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2597,7 +2597,8 @@ dependencies = [ [[package]] name = "fixed-hash" version = "0.7.0" -source = "git+https://github.com/paritytech/parity-common?rev=df638ab0885293d21d656dc300d39236b69ce57d#df638ab0885293d21d656dc300d39236b69ce57d" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c" dependencies = [ "byteorder", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 7cf2cd8938..93838adcac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,6 @@ resolver = "2" [patch] [patch.crates-io] -fixed-hash = { git = "https://github.com/paritytech/parity-common", rev="df638ab0885293d21d656dc300d39236b69ce57d" } warp = { git = "https://github.com/macladson/warp", rev="7e75acc368229a46a236a8c991bf251fe7fe50ef" } # FIXME(sproul): remove ssz_types = { git = "https://github.com/sigp/ssz_types", branch = "abstract-serde" } diff --git a/Makefile b/Makefile index 68ada1b4b9..ebad9b63f8 100644 --- a/Makefile +++ b/Makefile @@ -164,7 +164,8 @@ lint: -A clippy::from-over-into \ -A clippy::upper-case-acronyms \ -A clippy::vec-init-then-push \ - -A clippy::question-mark + -A clippy::question-mark \ + -A clippy::uninlined-format-args nightly-lint: cp .github/custom/clippy.toml . diff --git a/beacon_node/beacon_chain/src/attestation_rewards.rs b/beacon_node/beacon_chain/src/attestation_rewards.rs new file mode 100644 index 0000000000..3f39946978 --- /dev/null +++ b/beacon_node/beacon_chain/src/attestation_rewards.rs @@ -0,0 +1,196 @@ +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use eth2::lighthouse::attestation_rewards::{IdealAttestationRewards, TotalAttestationRewards}; +use eth2::lighthouse::StandardAttestationRewards; +use participation_cache::ParticipationCache; +use safe_arith::SafeArith; +use slog::{debug, Logger}; +use state_processing::{ + common::altair::BaseRewardPerIncrement, + per_epoch_processing::altair::{participation_cache, rewards_and_penalties::get_flag_weight}, +}; +use std::collections::HashMap; +use store::consts::altair::{ + PARTICIPATION_FLAG_WEIGHTS, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, + TIMELY_TARGET_FLAG_INDEX, +}; +use types::consts::altair::WEIGHT_DENOMINATOR; + +use types::{Epoch, EthSpec}; + +use eth2::types::ValidatorId; + +impl BeaconChain { + pub fn compute_attestation_rewards( + &self, + epoch: Epoch, + validators: Vec, + log: Logger, + ) -> Result { + debug!(log, "computing attestation rewards"; "epoch" => epoch, "validator_count" => validators.len()); + + // Get state + let spec = &self.spec; + + let state_slot = (epoch + 1).end_slot(T::EthSpec::slots_per_epoch()); + + let state_root = self + .state_root_at_slot(state_slot)? + .ok_or(BeaconChainError::NoStateForSlot(state_slot))?; + + let mut state = self + .get_state(&state_root, Some(state_slot))? + .ok_or(BeaconChainError::MissingBeaconState(state_root))?; + + // Calculate ideal_rewards + let participation_cache = ParticipationCache::new(&state, spec)?; + + let previous_epoch = state.previous_epoch(); + + let mut ideal_rewards_hashmap = HashMap::new(); + + for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() { + let weight = get_flag_weight(flag_index) + .map_err(|_| BeaconChainError::AttestationRewardsError)?; + + let unslashed_participating_indices = participation_cache + .get_unslashed_participating_indices(flag_index, previous_epoch)?; + + let unslashed_participating_balance = + unslashed_participating_indices + .total_balance() + .map_err(|_| BeaconChainError::AttestationRewardsError)?; + + let unslashed_participating_increments = + unslashed_participating_balance.safe_div(spec.effective_balance_increment)?; + + let total_active_balance = participation_cache.current_epoch_total_active_balance(); + + let active_increments = + total_active_balance.safe_div(spec.effective_balance_increment)?; + + let base_reward_per_increment = + BaseRewardPerIncrement::new(total_active_balance, spec)?; + + for effective_balance_eth in 0..=32 { + let base_reward = + effective_balance_eth.safe_mul(base_reward_per_increment.as_u64())?; + + let penalty = -(base_reward.safe_mul(weight)?.safe_div(WEIGHT_DENOMINATOR)? as i64); + + let reward_numerator = base_reward + .safe_mul(weight)? + .safe_mul(unslashed_participating_increments)?; + + let ideal_reward = reward_numerator + .safe_div(active_increments)? + .safe_div(WEIGHT_DENOMINATOR)?; + if !state.is_in_inactivity_leak(previous_epoch, spec) { + ideal_rewards_hashmap + .insert((flag_index, effective_balance_eth), (ideal_reward, penalty)); + } else { + ideal_rewards_hashmap.insert((flag_index, effective_balance_eth), (0, penalty)); + } + } + } + + // Calculate total_rewards + let mut total_rewards: Vec = Vec::new(); + + let validators = if validators.is_empty() { + participation_cache.eligible_validator_indices().to_vec() + } else { + validators + .into_iter() + .map(|validator| match validator { + ValidatorId::Index(i) => Ok(i as usize), + ValidatorId::PublicKey(pubkey) => state + .get_validator_index(&pubkey)? + .ok_or(BeaconChainError::ValidatorPubkeyUnknown(pubkey)), + }) + .collect::, _>>()? + }; + + for validator_index in &validators { + let eligible = state.is_eligible_validator(previous_epoch, *validator_index)?; + let mut head_reward = 0u64; + let mut target_reward = 0i64; + let mut source_reward = 0i64; + + if eligible { + let effective_balance = state.get_effective_balance(*validator_index)?; + + let effective_balance_eth = + effective_balance.safe_div(spec.effective_balance_increment)?; + + for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() { + let (ideal_reward, penalty) = ideal_rewards_hashmap + .get(&(flag_index, effective_balance_eth)) + .ok_or(BeaconChainError::AttestationRewardsError)?; + let voted_correctly = participation_cache + .get_unslashed_participating_indices(flag_index, previous_epoch) + .map_err(|_| BeaconChainError::AttestationRewardsError)? + .contains(*validator_index) + .map_err(|_| BeaconChainError::AttestationRewardsError)?; + if voted_correctly { + if flag_index == TIMELY_HEAD_FLAG_INDEX { + head_reward += ideal_reward; + } else if flag_index == TIMELY_TARGET_FLAG_INDEX { + target_reward += *ideal_reward as i64; + } else if flag_index == TIMELY_SOURCE_FLAG_INDEX { + source_reward += *ideal_reward as i64; + } + } else if flag_index == TIMELY_HEAD_FLAG_INDEX { + head_reward = 0; + } else if flag_index == TIMELY_TARGET_FLAG_INDEX { + target_reward = *penalty; + } else if flag_index == TIMELY_SOURCE_FLAG_INDEX { + source_reward = *penalty; + } + } + } + total_rewards.push(TotalAttestationRewards { + validator_index: *validator_index as u64, + head: head_reward, + target: target_reward, + source: source_reward, + }); + } + + // Convert hashmap to vector + let mut ideal_rewards: Vec = ideal_rewards_hashmap + .iter() + .map( + |((flag_index, effective_balance_eth), (ideal_reward, _penalty))| { + (flag_index, effective_balance_eth, ideal_reward) + }, + ) + .fold( + HashMap::new(), + |mut acc, (flag_index, effective_balance_eth, ideal_reward)| { + let entry = acc.entry(*effective_balance_eth as u32).or_insert( + IdealAttestationRewards { + effective_balance: *effective_balance_eth, + head: 0, + target: 0, + source: 0, + }, + ); + match *flag_index { + TIMELY_SOURCE_FLAG_INDEX => entry.source += ideal_reward, + TIMELY_TARGET_FLAG_INDEX => entry.target += ideal_reward, + TIMELY_HEAD_FLAG_INDEX => entry.head += ideal_reward, + _ => {} + } + acc + }, + ) + .into_values() + .collect::>(); + ideal_rewards.sort_by(|a, b| a.effective_balance.cmp(&b.effective_balance)); + + Ok(StandardAttestationRewards { + ideal_rewards, + total_rewards, + }) + } +} diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index b60ce7efe5..04f601fad9 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -27,6 +27,11 @@ //! ▼ //! impl VerifiedAttestation //! ``` + +// Ignore this lint for `AttestationSlashInfo` which is of comparable size to the non-error types it +// is returned alongside. +#![allow(clippy::result_large_err)] + mod batch; use crate::{ diff --git a/beacon_node/beacon_chain/src/beacon_block_reward.rs b/beacon_node/beacon_chain/src/beacon_block_reward.rs new file mode 100644 index 0000000000..3f186c37c1 --- /dev/null +++ b/beacon_node/beacon_chain/src/beacon_block_reward.rs @@ -0,0 +1,237 @@ +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use eth2::lighthouse::StandardBlockReward; +use operation_pool::RewardCache; +use safe_arith::SafeArith; +use slog::error; +use state_processing::{ + common::{ + altair, get_attestation_participation_flag_indices, get_attesting_indices_from_state, + }, + per_block_processing::{ + altair::sync_committee::compute_sync_aggregate_rewards, get_slashable_indices, + }, +}; +use store::{ + consts::altair::{PARTICIPATION_FLAG_WEIGHTS, PROPOSER_WEIGHT, WEIGHT_DENOMINATOR}, + RelativeEpoch, +}; +use types::{BeaconBlockRef, BeaconState, BeaconStateError, ExecPayload, Hash256}; + +type BeaconBlockSubRewardValue = u64; + +impl BeaconChain { + pub fn compute_beacon_block_reward>( + &self, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, + block_root: Hash256, + state: &mut BeaconState, + ) -> Result { + if block.slot() != state.slot() { + return Err(BeaconChainError::BlockRewardSlotError); + } + + state.build_committee_cache(RelativeEpoch::Previous, &self.spec)?; + state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; + + let proposer_index = block.proposer_index(); + + let sync_aggregate_reward = + self.compute_beacon_block_sync_aggregate_reward(block, state)?; + + let proposer_slashing_reward = self + .compute_beacon_block_proposer_slashing_reward(block, state) + .map_err(|e| { + error!( + self.log, + "Error calculating proposer slashing reward"; + "error" => ?e + ); + BeaconChainError::BlockRewardError + })?; + + let attester_slashing_reward = self + .compute_beacon_block_attester_slashing_reward(block, state) + .map_err(|e| { + error!( + self.log, + "Error calculating attester slashing reward"; + "error" => ?e + ); + BeaconChainError::BlockRewardError + })?; + + let block_attestation_reward = if let BeaconState::Base(_) = state { + self.compute_beacon_block_attestation_reward_base(block, block_root, state) + .map_err(|e| { + error!( + self.log, + "Error calculating base block attestation reward"; + "error" => ?e + ); + BeaconChainError::BlockRewardAttestationError + })? + } else { + self.compute_beacon_block_attestation_reward_altair(block, state) + .map_err(|e| { + error!( + self.log, + "Error calculating altair block attestation reward"; + "error" => ?e + ); + BeaconChainError::BlockRewardAttestationError + })? + }; + + let total_reward = sync_aggregate_reward + .safe_add(proposer_slashing_reward)? + .safe_add(attester_slashing_reward)? + .safe_add(block_attestation_reward)?; + + Ok(StandardBlockReward { + proposer_index, + total: total_reward, + attestations: block_attestation_reward, + sync_aggregate: sync_aggregate_reward, + proposer_slashings: proposer_slashing_reward, + attester_slashings: attester_slashing_reward, + }) + } + + fn compute_beacon_block_sync_aggregate_reward>( + &self, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, + state: &BeaconState, + ) -> Result { + if let Ok(sync_aggregate) = block.body().sync_aggregate() { + let (_, proposer_reward_per_bit) = compute_sync_aggregate_rewards(state, &self.spec) + .map_err(|_| BeaconChainError::BlockRewardSyncError)?; + Ok(sync_aggregate.sync_committee_bits.num_set_bits() as u64 * proposer_reward_per_bit) + } else { + Ok(0) + } + } + + fn compute_beacon_block_proposer_slashing_reward>( + &self, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, + state: &BeaconState, + ) -> Result { + let mut proposer_slashing_reward = 0; + + let proposer_slashings = block.body().proposer_slashings(); + + for proposer_slashing in proposer_slashings { + proposer_slashing_reward.safe_add_assign( + state + .get_validator(proposer_slashing.proposer_index() as usize)? + .effective_balance + .safe_div(self.spec.whistleblower_reward_quotient)?, + )?; + } + + Ok(proposer_slashing_reward) + } + + fn compute_beacon_block_attester_slashing_reward>( + &self, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, + state: &BeaconState, + ) -> Result { + let mut attester_slashing_reward = 0; + + let attester_slashings = block.body().attester_slashings(); + + for attester_slashing in attester_slashings { + for attester_index in get_slashable_indices(state, attester_slashing)? { + attester_slashing_reward.safe_add_assign( + state + .get_validator(attester_index as usize)? + .effective_balance + .safe_div(self.spec.whistleblower_reward_quotient)?, + )?; + } + } + + Ok(attester_slashing_reward) + } + + fn compute_beacon_block_attestation_reward_base>( + &self, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, + block_root: Hash256, + state: &BeaconState, + ) -> Result { + // Call compute_block_reward in the base case + // Since base does not have sync aggregate, we only grab attesation portion of the returned + // value + let mut reward_cache = RewardCache::default(); + let block_attestation_reward = self + .compute_block_reward(block, block_root, state, &mut reward_cache, true)? + .attestation_rewards + .total; + + Ok(block_attestation_reward) + } + + fn compute_beacon_block_attestation_reward_altair>( + &self, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, + state: &mut BeaconState, + ) -> Result { + let total_active_balance = state.get_total_active_balance()?; + let base_reward_per_increment = + altair::BaseRewardPerIncrement::new(total_active_balance, &self.spec)?; + + let mut total_proposer_reward = 0; + + let proposer_reward_denominator = WEIGHT_DENOMINATOR + .safe_sub(PROPOSER_WEIGHT)? + .safe_mul(WEIGHT_DENOMINATOR)? + .safe_div(PROPOSER_WEIGHT)?; + + for attestation in block.body().attestations() { + let data = &attestation.data; + let inclusion_delay = state.slot().safe_sub(data.slot)?.as_u64(); + let participation_flag_indices = get_attestation_participation_flag_indices( + state, + data, + inclusion_delay, + &self.spec, + )?; + + let attesting_indices = get_attesting_indices_from_state(state, attestation)?; + + let mut proposer_reward_numerator = 0; + for index in attesting_indices { + let index = index as usize; + for (flag_index, &weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() { + let epoch_participation = + state.get_epoch_participation_mut(data.target.epoch)?; + let validator_participation = epoch_participation + .get_mut(index) + .ok_or(BeaconStateError::ParticipationOutOfBounds(index))?; + + if participation_flag_indices.contains(&flag_index) + && !validator_participation.has_flag(flag_index)? + { + validator_participation.add_flag(flag_index)?; + proposer_reward_numerator.safe_add_assign( + altair::get_base_reward( + state, + index, + base_reward_per_increment, + &self.spec, + )? + .safe_mul(weight)?, + )?; + } + } + } + total_proposer_reward.safe_add_assign( + proposer_reward_numerator.safe_div(proposer_reward_denominator)?, + )?; + } + + Ok(total_proposer_reward) + } +} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1092bf45b6..561be515bf 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -960,7 +960,9 @@ impl BeaconChain { .ok_or(Error::ExecutionLayerMissing)? .get_payload_by_block_hash(exec_block_hash) .await - .map_err(|e| Error::ExecutionLayerErrorPayloadReconstruction(exec_block_hash, e))? + .map_err(|e| { + Error::ExecutionLayerErrorPayloadReconstruction(exec_block_hash, Box::new(e)) + })? .ok_or(Error::BlockHashMissingFromExecutionLayer(exec_block_hash))?; // Verify payload integrity. @@ -977,8 +979,6 @@ impl BeaconChain { return Err(Error::InconsistentPayloadReconstructed { slot: blinded_block.slot(), exec_block_hash, - canonical_payload_root: execution_payload_header.tree_hash_root(), - reconstructed_payload_root: header_from_payload.tree_hash_root(), canonical_transactions_root: execution_payload_header.transactions_root, reconstructed_transactions_root: header_from_payload.transactions_root, }); diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 623b025bc1..07619fb82c 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -42,6 +42,11 @@ //! END //! //! ``` + +// Ignore this lint for `BlockSlashInfo` which is of comparable size to the non-error types it is +// returned alongside. +#![allow(clippy::result_large_err)] + use crate::beacon_snapshot::PreProcessingSnapshot; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index be134fcfdd..e38c11be74 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -50,7 +50,6 @@ pub enum BeaconChainError { }, SlotClockDidNotStart, NoStateForSlot(Slot), - UnableToFindTargetRoot(Slot), BeaconStateError(BeaconStateError), DBInconsistent(String), DBError(store::Error), @@ -138,13 +137,11 @@ pub enum BeaconChainError { BuilderMissing, ExecutionLayerMissing, BlockVariantLacksExecutionPayload(Hash256), - ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, execution_layer::Error), + ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, Box), BlockHashMissingFromExecutionLayer(ExecutionBlockHash), InconsistentPayloadReconstructed { slot: Slot, exec_block_hash: ExecutionBlockHash, - canonical_payload_root: Hash256, - reconstructed_payload_root: Hash256, canonical_transactions_root: Hash256, reconstructed_transactions_root: Hash256, }, @@ -154,9 +151,12 @@ pub enum BeaconChainError { ExecutionForkChoiceUpdateInvalid { status: PayloadStatus, }, + BlockRewardError, BlockRewardSlotError, BlockRewardAttestationError, BlockRewardSyncError, + SyncCommitteeRewardsSyncError, + AttestationRewardsError, HeadMissingFromForkChoice(Hash256), FinalizedBlockMissingFromForkChoice(Hash256), HeadBlockMissingFromForkChoice(Hash256), diff --git a/beacon_node/beacon_chain/src/fork_choice_signal.rs b/beacon_node/beacon_chain/src/fork_choice_signal.rs index fd92de661d..f5424d417e 100644 --- a/beacon_node/beacon_chain/src/fork_choice_signal.rs +++ b/beacon_node/beacon_chain/src/fork_choice_signal.rs @@ -43,7 +43,7 @@ impl ForkChoiceSignalTx { /// /// Return an error if the provided `slot` is strictly less than any previously provided slot. pub fn notify_fork_choice_complete(&self, slot: Slot) -> Result<(), BeaconChainError> { - let &(ref lock, ref condvar) = &*self.pair; + let (lock, condvar) = &*self.pair; let mut current_slot = lock.lock(); @@ -72,7 +72,7 @@ impl Default for ForkChoiceSignalTx { impl ForkChoiceSignalRx { pub fn wait_for_fork_choice(&self, slot: Slot, timeout: Duration) -> ForkChoiceWaitResult { - let &(ref lock, ref condvar) = &*self.pair; + let (lock, condvar) = &*self.pair; let mut current_slot = lock.lock(); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index f3d4683b1b..90b3d70129 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -1,6 +1,8 @@ #![recursion_limit = "128"] // For lazy-static +pub mod attestation_rewards; pub mod attestation_verification; mod attester_cache; +pub mod beacon_block_reward; mod beacon_chain; mod beacon_fork_choice_store; pub mod beacon_proposer_cache; @@ -39,6 +41,7 @@ pub mod proposer_prep_service; pub mod schema_change; mod shuffling_cache; pub mod state_advance_timer; +pub mod sync_committee_rewards; pub mod sync_committee_verification; pub mod test_utils; mod timeout_rw_lock; diff --git a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs index ec9c90e735..20d7181808 100644 --- a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs @@ -2,6 +2,7 @@ use crate::{ beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes, }; use derivative::Derivative; +use eth2::types::Hash256; use slot_clock::SlotClock; use std::time::Duration; use strum::AsRefStr; @@ -36,6 +37,8 @@ pub enum Error { SigSlotStartIsNone, /// Failed to construct a LightClientOptimisticUpdate from state. FailedConstructingUpdate, + /// Unknown block with parent root. + UnknownBlockParentRoot(Hash256), /// Beacon chain error occured. BeaconChainError(BeaconChainError), LightClientUpdateError(LightClientUpdateError), @@ -58,6 +61,7 @@ impl From for Error { #[derivative(Clone(bound = "T: BeaconChainTypes"))] pub struct VerifiedLightClientOptimisticUpdate { light_client_optimistic_update: LightClientOptimisticUpdate, + pub parent_root: Hash256, seen_timestamp: Duration, } @@ -107,6 +111,16 @@ impl VerifiedLightClientOptimisticUpdate { None => return Err(Error::SigSlotStartIsNone), } + // check if we can process the optimistic update immediately + // otherwise queue + let canonical_root = light_client_optimistic_update + .attested_header + .canonical_root(); + + if canonical_root != head_block.message().parent_root() { + return Err(Error::UnknownBlockParentRoot(canonical_root)); + } + let optimistic_update = LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?; @@ -119,6 +133,7 @@ impl VerifiedLightClientOptimisticUpdate { Ok(Self { light_client_optimistic_update, + parent_root: canonical_root, seen_timestamp, }) } diff --git a/beacon_node/beacon_chain/src/sync_committee_rewards.rs b/beacon_node/beacon_chain/src/sync_committee_rewards.rs new file mode 100644 index 0000000000..561fed1a86 --- /dev/null +++ b/beacon_node/beacon_chain/src/sync_committee_rewards.rs @@ -0,0 +1,87 @@ +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; + +use eth2::lighthouse::SyncCommitteeReward; +use safe_arith::SafeArith; +use slog::error; +use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards; +use std::collections::HashMap; +use store::RelativeEpoch; +use types::{BeaconBlockRef, BeaconState, ExecPayload}; + +impl BeaconChain { + pub fn compute_sync_committee_rewards>( + &self, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, + state: &mut BeaconState, + ) -> Result, BeaconChainError> { + if block.slot() != state.slot() { + return Err(BeaconChainError::BlockRewardSlotError); + } + + let spec = &self.spec; + + state.build_committee_cache(RelativeEpoch::Current, spec)?; + + let sync_aggregate = block.body().sync_aggregate()?; + + let sync_committee = state.current_sync_committee()?.clone(); + + let sync_committee_indices = state.get_sync_committee_indices(&sync_committee)?; + + let (participant_reward_value, proposer_reward_per_bit) = + compute_sync_aggregate_rewards(state, spec).map_err(|e| { + error!( + self.log, "Error calculating sync aggregate rewards"; + "error" => ?e + ); + BeaconChainError::SyncCommitteeRewardsSyncError + })?; + + let mut balances = HashMap::::new(); + + let mut total_proposer_rewards = 0; + let proposer_index = state.get_beacon_proposer_index(block.slot(), spec)?; + + // Apply rewards to participant balances. Keep track of proposer rewards + for (validator_index, participant_bit) in sync_committee_indices + .iter() + .zip(sync_aggregate.sync_committee_bits.iter()) + { + let participant_balance = balances + .entry(*validator_index) + .or_insert_with(|| state.balances()[*validator_index]); + + if participant_bit { + participant_balance.safe_add_assign(participant_reward_value)?; + + balances + .entry(proposer_index) + .or_insert_with(|| state.balances()[proposer_index]) + .safe_add_assign(proposer_reward_per_bit)?; + + total_proposer_rewards.safe_add_assign(proposer_reward_per_bit)?; + } else { + *participant_balance = participant_balance.saturating_sub(participant_reward_value); + } + } + + Ok(balances + .iter() + .filter_map(|(i, new_balance)| { + let reward = if *i != proposer_index { + *new_balance as i64 - state.balances()[*i] as i64 + } else if sync_committee_indices.contains(i) { + *new_balance as i64 + - state.balances()[*i] as i64 + - total_proposer_rewards as i64 + } else { + return None; + }; + Some(SyncCommitteeReward { + validator_index: *i as u64, + reward, + }) + }) + .collect()) + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 26113af552..6f3704e254 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2,6 +2,7 @@ pub use crate::persisted_beacon_chain::PersistedBeaconChain; pub use crate::{ beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}, migrate::MigratorConfig, + sync_committee_verification::Error as SyncCommitteeError, validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification, }; @@ -1973,6 +1974,30 @@ where (honest_head, faulty_head) } + + pub fn process_sync_contributions( + &self, + sync_contributions: HarnessSyncContributions, + ) -> Result<(), SyncCommitteeError> { + let mut verified_contributions = Vec::with_capacity(sync_contributions.len()); + + for (_, contribution_and_proof) in sync_contributions { + let signed_contribution_and_proof = contribution_and_proof.unwrap(); + + let verified_contribution = self + .chain + .verify_sync_contribution_for_gossip(signed_contribution_and_proof)?; + + verified_contributions.push(verified_contribution); + } + + for verified_contribution in verified_contributions { + self.chain + .add_contribution_to_block_inclusion_pool(verified_contribution)?; + } + + Ok(()) + } } // Junk `Debug` impl to satistfy certain trait bounds during testing. diff --git a/beacon_node/beacon_chain/tests/main.rs b/beacon_node/beacon_chain/tests/main.rs index 1c61e9927f..eceb4f2e85 100644 --- a/beacon_node/beacon_chain/tests/main.rs +++ b/beacon_node/beacon_chain/tests/main.rs @@ -4,6 +4,7 @@ mod block_verification; mod merge; mod op_verification; mod payload_invalidation; +mod rewards; mod store_tests; mod sync_committee_verification; mod tests; diff --git a/beacon_node/beacon_chain/tests/rewards.rs b/beacon_node/beacon_chain/tests/rewards.rs new file mode 100644 index 0000000000..b61bea1242 --- /dev/null +++ b/beacon_node/beacon_chain/tests/rewards.rs @@ -0,0 +1,121 @@ +#![cfg(test)] + +use std::collections::HashMap; + +use beacon_chain::test_utils::{ + generate_deterministic_keypairs, BeaconChainHarness, EphemeralHarnessType, +}; +use beacon_chain::{ + test_utils::{AttestationStrategy, BlockStrategy, RelativeSyncCommittee}, + types::{Epoch, EthSpec, Keypair, MinimalEthSpec}, +}; +use lazy_static::lazy_static; + +pub const VALIDATOR_COUNT: usize = 64; + +lazy_static! { + static ref KEYPAIRS: Vec = generate_deterministic_keypairs(VALIDATOR_COUNT); +} + +fn get_harness() -> BeaconChainHarness> { + let mut spec = E::default_spec(); + + spec.altair_fork_epoch = Some(Epoch::new(0)); // We use altair for all tests + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .keypairs(KEYPAIRS.to_vec()) + .fresh_ephemeral_store() + .build(); + + harness.advance_slot(); + + harness +} + +#[tokio::test] +async fn test_sync_committee_rewards() { + let num_block_produced = MinimalEthSpec::slots_per_epoch(); + let harness = get_harness::(); + + let latest_block_root = harness + .extend_chain( + num_block_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Create and add sync committee message to op_pool + let sync_contributions = harness.make_sync_contributions( + &harness.get_current_state(), + latest_block_root, + harness.get_current_slot(), + RelativeSyncCommittee::Current, + ); + + harness + .process_sync_contributions(sync_contributions) + .unwrap(); + + // Add block + let chain = &harness.chain; + let (head_state, head_state_root) = harness.get_current_state_and_root(); + let target_slot = harness.get_current_slot() + 1; + + let (block_root, mut state) = harness + .add_attested_block_at_slot(target_slot, head_state, head_state_root, &[]) + .await + .unwrap(); + + let block = harness.get_block(block_root).unwrap(); + let parent_block = chain + .get_blinded_block(&block.parent_root()) + .unwrap() + .unwrap(); + let parent_state = chain + .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .unwrap() + .unwrap(); + + let reward_payload = chain + .compute_sync_committee_rewards(block.message(), &mut state) + .unwrap(); + + let rewards = reward_payload + .iter() + .map(|reward| (reward.validator_index, reward.reward)) + .collect::>(); + + let proposer_index = state + .get_beacon_proposer_index(target_slot, &MinimalEthSpec::default_spec()) + .unwrap(); + + let mut mismatches = vec![]; + + for validator in state.validators() { + let validator_index = state + .clone() + .get_validator_index(&validator.pubkey) + .unwrap() + .unwrap(); + let pre_state_balance = parent_state.balances()[validator_index]; + let post_state_balance = state.balances()[validator_index]; + let sync_committee_reward = rewards.get(&(validator_index as u64)).unwrap_or(&0); + + if validator_index == proposer_index { + continue; // Ignore proposer + } + + if pre_state_balance as i64 + *sync_committee_reward != post_state_balance as i64 { + mismatches.push(validator_index.to_string()); + } + } + + assert_eq!( + mismatches.len(), + 0, + "Expect 0 mismatches, but these validators have mismatches on balance: {} ", + mismatches.join(",") + ); +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 606b7ecca0..d7323f01ed 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -15,7 +15,9 @@ mod database; mod metrics; mod proposer_duties; mod publish_blocks; +mod standard_block_rewards; mod state_id; +mod sync_committee_rewards; mod sync_committees; mod ui; mod validator_inclusion; @@ -1719,6 +1721,114 @@ pub fn serve( }, ); + let beacon_rewards_path = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("rewards")) + .and(chain_filter.clone()); + + // GET beacon/rewards/blocks/{block_id} + let get_beacon_rewards_blocks = beacon_rewards_path + .clone() + .and(warp::path("blocks")) + .and(block_id_or_err) + .and(warp::path::end()) + .and_then(|chain: Arc>, block_id: BlockId| { + blocking_json_task(move || { + let (rewards, execution_optimistic) = + standard_block_rewards::compute_beacon_block_rewards(chain, block_id)?; + Ok(rewards) + .map(api_types::GenericResponse::from) + .map(|resp| resp.add_execution_optimistic(execution_optimistic)) + }) + }); + + /* + * beacon/rewards + */ + + let beacon_rewards_path = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("rewards")) + .and(chain_filter.clone()); + + // POST beacon/rewards/attestations/{epoch} + let post_beacon_rewards_attestations = beacon_rewards_path + .clone() + .and(warp::path("attestations")) + .and(warp::path::param::()) + .and(warp::path::end()) + .and(warp::body::json()) + .and(log_filter.clone()) + .and_then( + |chain: Arc>, + epoch: Epoch, + validators: Vec, + log: Logger| { + blocking_json_task(move || { + let attestation_rewards = chain + .compute_attestation_rewards(epoch, validators, log) + .map_err(|e| match e { + BeaconChainError::MissingBeaconState(root) => { + warp_utils::reject::custom_not_found(format!( + "missing state {root:?}", + )) + } + BeaconChainError::NoStateForSlot(slot) => { + warp_utils::reject::custom_not_found(format!( + "missing state at slot {slot}" + )) + } + BeaconChainError::BeaconStateError( + BeaconStateError::UnknownValidator(validator_index), + ) => warp_utils::reject::custom_bad_request(format!( + "validator is unknown: {validator_index}" + )), + BeaconChainError::ValidatorPubkeyUnknown(pubkey) => { + warp_utils::reject::custom_bad_request(format!( + "validator pubkey is unknown: {pubkey:?}" + )) + } + e => warp_utils::reject::custom_server_error(format!( + "unexpected error: {:?}", + e + )), + })?; + let execution_optimistic = + chain.is_optimistic_or_invalid_head().unwrap_or_default(); + + Ok(attestation_rewards) + .map(api_types::GenericResponse::from) + .map(|resp| resp.add_execution_optimistic(execution_optimistic)) + }) + }, + ); + + // POST beacon/rewards/sync_committee/{block_id} + let post_beacon_rewards_sync_committee = beacon_rewards_path + .clone() + .and(warp::path("sync_committee")) + .and(block_id_or_err) + .and(warp::path::end()) + .and(warp::body::json()) + .and(log_filter.clone()) + .and_then( + |chain: Arc>, + block_id: BlockId, + validators: Vec, + log: Logger| { + blocking_json_task(move || { + let (rewards, execution_optimistic) = + sync_committee_rewards::compute_sync_committee_rewards( + chain, block_id, validators, log, + )?; + + Ok(rewards) + .map(api_types::GenericResponse::from) + .map(|resp| resp.add_execution_optimistic(execution_optimistic)) + }) + }, + ); + /* * config */ @@ -3365,6 +3475,7 @@ pub fn serve( .or(get_beacon_pool_proposer_slashings.boxed()) .or(get_beacon_pool_voluntary_exits.boxed()) .or(get_beacon_deposit_snapshot.boxed()) + .or(get_beacon_rewards_blocks.boxed()) .or(get_config_fork_schedule.boxed()) .or(get_config_spec.boxed()) .or(get_config_deposit_contract.boxed()) @@ -3416,6 +3527,8 @@ pub fn serve( .or(post_beacon_pool_proposer_slashings.boxed()) .or(post_beacon_pool_voluntary_exits.boxed()) .or(post_beacon_pool_sync_committees.boxed()) + .or(post_beacon_rewards_attestations.boxed()) + .or(post_beacon_rewards_sync_committee.boxed()) .or(post_validator_duties_attester.boxed()) .or(post_validator_duties_sync.boxed()) .or(post_validator_aggregate_and_proofs.boxed()) diff --git a/beacon_node/http_api/src/standard_block_rewards.rs b/beacon_node/http_api/src/standard_block_rewards.rs new file mode 100644 index 0000000000..b3c90d08a4 --- /dev/null +++ b/beacon_node/http_api/src/standard_block_rewards.rs @@ -0,0 +1,27 @@ +use crate::sync_committee_rewards::get_state_before_applying_block; +use crate::BlockId; +use crate::ExecutionOptimistic; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2::lighthouse::StandardBlockReward; +use std::sync::Arc; +use warp_utils::reject::beacon_chain_error; +//// The difference between block_rewards and beacon_block_rewards is the later returns block +//// reward format that satisfies beacon-api specs +pub fn compute_beacon_block_rewards( + chain: Arc>, + block_id: BlockId, +) -> Result<(StandardBlockReward, ExecutionOptimistic), warp::Rejection> { + let (block, execution_optimistic) = block_id.blinded_block(&chain)?; + + let block_ref = block.message(); + + let block_root = block.canonical_root(); + + let mut state = get_state_before_applying_block(chain.clone(), &block)?; + + let rewards = chain + .compute_beacon_block_reward(block_ref, block_root, &mut state) + .map_err(beacon_chain_error)?; + + Ok((rewards, execution_optimistic)) +} diff --git a/beacon_node/http_api/src/sync_committee_rewards.rs b/beacon_node/http_api/src/sync_committee_rewards.rs new file mode 100644 index 0000000000..cefa98db41 --- /dev/null +++ b/beacon_node/http_api/src/sync_committee_rewards.rs @@ -0,0 +1,77 @@ +use crate::{BlockId, ExecutionOptimistic}; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use eth2::lighthouse::SyncCommitteeReward; +use eth2::types::ValidatorId; +use slog::{debug, Logger}; +use state_processing::BlockReplayer; +use std::sync::Arc; +use types::{BeaconState, SignedBlindedBeaconBlock}; +use warp_utils::reject::{beacon_chain_error, custom_not_found}; + +pub fn compute_sync_committee_rewards( + chain: Arc>, + block_id: BlockId, + validators: Vec, + log: Logger, +) -> Result<(Option>, ExecutionOptimistic), warp::Rejection> { + let (block, execution_optimistic) = block_id.blinded_block(&chain)?; + + let mut state = get_state_before_applying_block(chain.clone(), &block)?; + + let reward_payload = chain + .compute_sync_committee_rewards(block.message(), &mut state) + .map_err(beacon_chain_error)?; + + let data = if reward_payload.is_empty() { + debug!(log, "compute_sync_committee_rewards returned empty"); + None + } else if validators.is_empty() { + Some(reward_payload) + } else { + Some( + reward_payload + .into_iter() + .filter(|reward| { + validators.iter().any(|validator| match validator { + ValidatorId::Index(i) => reward.validator_index == *i, + ValidatorId::PublicKey(pubkey) => match state.get_validator_index(pubkey) { + Ok(Some(i)) => reward.validator_index == i as u64, + _ => false, + }, + }) + }) + .collect::>(), + ) + }; + + Ok((data, execution_optimistic)) +} + +pub fn get_state_before_applying_block( + chain: Arc>, + block: &SignedBlindedBeaconBlock, +) -> Result, warp::reject::Rejection> { + let parent_block: SignedBlindedBeaconBlock = chain + .get_blinded_block(&block.parent_root()) + .and_then(|maybe_block| { + maybe_block.ok_or_else(|| BeaconChainError::MissingBeaconBlock(block.parent_root())) + }) + .map_err(|e| custom_not_found(format!("Parent block is not available! {:?}", e)))?; + + let parent_state = chain + .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .and_then(|maybe_state| { + maybe_state + .ok_or_else(|| BeaconChainError::MissingBeaconState(parent_block.state_root())) + }) + .map_err(|e| custom_not_found(format!("Parent state is not available! {:?}", e)))?; + + let replayer = BlockReplayer::new(parent_state, &chain.spec) + .no_signature_verification() + .state_root_iter([Ok((parent_block.state_root(), parent_block.slot()))].into_iter()) + .minimal_block_root_verification() + .apply_blocks(vec![], Some(block.slot())) + .map_err(beacon_chain_error)?; + + Ok(replayer.into_state()) +} diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 743a97a29c..8118443a65 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -67,7 +67,8 @@ use types::{ SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; use work_reprocessing_queue::{ - spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, + spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, + QueuedUnaggregate, ReadyWork, }; use worker::{Toolbox, Worker}; @@ -137,6 +138,10 @@ const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024; /// before we start dropping them. const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored +/// for reprocessing before we start dropping them. +const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128; + /// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping /// them. const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048; @@ -213,6 +218,7 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; +pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { @@ -694,6 +700,21 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, + ReadyWork::LightClientUpdate(QueuedLightClientUpdate { + peer_id, + message_id, + light_client_optimistic_update, + seen_timestamp, + .. + }) => Self { + drop_during_sync: true, + work: Work::UnknownLightClientOptimisticUpdate { + message_id, + peer_id, + light_client_optimistic_update, + seen_timestamp, + }, + }, } } } @@ -733,6 +754,12 @@ pub enum Work { aggregate: Box>, seen_timestamp: Duration, }, + UnknownLightClientOptimisticUpdate { + message_id: MessageId, + peer_id: PeerId, + light_client_optimistic_update: Box>, + seen_timestamp: Duration, + }, GossipAggregateBatch { packages: Vec>, }, @@ -845,6 +872,7 @@ impl Work { Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, + Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, } } } @@ -979,6 +1007,8 @@ impl BeaconProcessor { // Using a FIFO queue for light client updates to maintain sequence order. let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN); let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN); + let mut unknown_light_client_update_queue = + FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN); // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); @@ -1346,6 +1376,9 @@ impl BeaconProcessor { Work::UnknownBlockAggregate { .. } => { unknown_block_aggregate_queue.push(work) } + Work::UnknownLightClientOptimisticUpdate { .. } => { + unknown_light_client_update_queue.push(work, work_id, &self.log) + } } } } @@ -1665,6 +1698,7 @@ impl BeaconProcessor { message_id, peer_id, *light_client_optimistic_update, + Some(work_reprocessing_tx), seen_timestamp, ) }), @@ -1787,6 +1821,20 @@ impl BeaconProcessor { seen_timestamp, ) }), + Work::UnknownLightClientOptimisticUpdate { + message_id, + peer_id, + light_client_optimistic_update, + seen_timestamp, + } => task_spawner.spawn_blocking(move || { + worker.process_gossip_optimistic_update( + message_id, + peer_id, + *light_client_optimistic_update, + None, + seen_timestamp, + ) + }), }; } } diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 2aeec11c32..8c568a7eef 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -19,7 +19,7 @@ use futures::task::Poll; use futures::{Stream, StreamExt}; use lighthouse_network::{MessageId, PeerId}; use logging::TimeLatch; -use slog::{crit, debug, error, warn, Logger}; +use slog::{crit, debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::pin::Pin; @@ -30,12 +30,16 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; -use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; +use types::{ + Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, + SignedBeaconBlock, SubnetId, +}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; +const LIGHT_CLIENT_UPDATES: &str = "lc_updates"; /// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts. /// This is to account for any slight drift in the system clock. @@ -44,6 +48,9 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5); /// For how long to queue aggregated and unaggregated attestations for re-processing. pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); +/// For how long to queue light client updates for re-processing. +pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); + /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); @@ -55,6 +62,9 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16; /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; +/// How many light client updates we keep before new ones get dropped. +const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128; + /// Messages that the scheduler can receive. pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. @@ -62,13 +72,18 @@ pub enum ReprocessQueueMessage { /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// hash until the gossip block is imported. RpcBlock(QueuedRpcBlock), - /// A block that was successfully processed. We use this to handle attestations for unknown - /// blocks. - BlockImported(Hash256), + /// A block that was successfully processed. We use this to handle attestations and light client updates + /// for unknown blocks. + BlockImported { + block_root: Hash256, + parent_root: Hash256, + }, /// An unaggregated attestation that references an unknown block. UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. UnknownBlockAggregate(QueuedAggregate), + /// A light client optimistic update that references a parent root that has not been seen as a parent. + UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), } /// Events sent by the scheduler once they are ready for re-processing. @@ -77,6 +92,7 @@ pub enum ReadyWork { RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), + LightClientUpdate(QueuedLightClientUpdate), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -99,6 +115,16 @@ pub struct QueuedAggregate { pub seen_timestamp: Duration, } +/// A light client update for which the corresponding parent block was not seen while processing, +/// queued for later. +pub struct QueuedLightClientUpdate { + pub peer_id: PeerId, + pub message_id: MessageId, + pub light_client_optimistic_update: Box>, + pub parent_root: Hash256, + pub seen_timestamp: Duration, +} + /// A block that arrived early and has been queued for later import. pub struct QueuedGossipBlock { pub peer_id: PeerId, @@ -127,6 +153,8 @@ enum InboundEvent { ReadyRpcBlock(QueuedRpcBlock), /// An aggregated or unaggregated attestation is ready for re-processing. ReadyAttestation(QueuedAttestationId), + /// A light client update that is ready for re-processing. + ReadyLightClientUpdate(QueuedLightClientUpdateId), /// A `DelayQueue` returned an error. DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` @@ -147,6 +175,8 @@ struct ReprocessQueue { rpc_block_delay_queue: DelayQueue>, /// Queue to manage scheduled attestations. attestations_delay_queue: DelayQueue, + /// Queue to manage scheduled light client updates. + lc_updates_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -157,15 +187,23 @@ struct ReprocessQueue { queued_unaggregates: FnvHashMap, DelayKey)>, /// Attestations (aggregated and unaggregated) per root. awaiting_attestations_per_root: HashMap>, + /// Queued Light Client Updates. + queued_lc_updates: FnvHashMap, DelayKey)>, + /// Light Client Updates per parent_root. + awaiting_lc_updates_per_parent_root: HashMap>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, + next_lc_update: usize, early_block_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, + lc_update_delay_debounce: TimeLatch, } +pub type QueuedLightClientUpdateId = usize; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum QueuedAttestationId { Aggregate(usize), @@ -235,6 +273,20 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.lc_updates_delay_queue.poll_expired(cx) { + Poll::Ready(Some(Ok(lc_id))) => { + return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate( + lc_id.into_inner(), + ))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue"))); + } + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + // Last empty the messages channel. match self.work_reprocessing_rx.poll_recv(cx) { Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))), @@ -264,14 +316,19 @@ pub fn spawn_reprocess_scheduler( gossip_block_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), + lc_updates_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), + queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), + awaiting_lc_updates_per_parent_root: HashMap::new(), next_attestation: 0, + next_lc_update: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), + lc_update_delay_debounce: TimeLatch::default(), }; executor.spawn( @@ -473,9 +530,49 @@ impl ReprocessQueue { self.next_attestation += 1; } - InboundEvent::Msg(BlockImported(root)) => { + InboundEvent::Msg(UnknownLightClientOptimisticUpdate( + queued_light_client_optimistic_update, + )) => { + if self.lc_updates_delay_queue.len() >= MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES { + if self.lc_update_delay_debounce.elapsed() { + error!( + log, + "Light client updates delay queue is full"; + "queue_size" => MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES, + "msg" => "check system clock" + ); + } + // Drop the light client update. + return; + } + + let lc_id: QueuedLightClientUpdateId = self.next_lc_update; + + // Register the delay. + let delay_key = self + .lc_updates_delay_queue + .insert(lc_id, QUEUED_LIGHT_CLIENT_UPDATE_DELAY); + + // Register the light client update for the corresponding root. + self.awaiting_lc_updates_per_parent_root + .entry(queued_light_client_optimistic_update.parent_root) + .or_default() + .push(lc_id); + + // Store the light client update and its info. + self.queued_lc_updates.insert( + self.next_lc_update, + (queued_light_client_optimistic_update, delay_key), + ); + + self.next_lc_update += 1; + } + InboundEvent::Msg(BlockImported { + block_root, + parent_root, + }) => { // Unqueue the attestations we have for this root, if any. - if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) { + if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { for id in queued_ids { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS, @@ -511,12 +608,62 @@ impl ReprocessQueue { error!( log, "Unknown queued attestation for block root"; - "block_root" => ?root, + "block_root" => ?block_root, "att_id" => ?id, ); } } } + // Unqueue the light client optimistic updates we have for this root, if any. + if let Some(queued_lc_id) = self + .awaiting_lc_updates_per_parent_root + .remove(&parent_root) + { + debug!( + log, + "Dequeuing light client optimistic updates"; + "parent_root" => %parent_root, + "count" => queued_lc_id.len(), + ); + + for lc_id in queued_lc_id { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES, + ); + if let Some((work, delay_key)) = self.queued_lc_updates.remove(&lc_id).map( + |(light_client_optimistic_update, delay_key)| { + ( + ReadyWork::LightClientUpdate(light_client_optimistic_update), + delay_key, + ) + }, + ) { + // Remove the delay + self.lc_updates_delay_queue.remove(&delay_key); + + // Send the work + match self.ready_work_tx.try_send(work) { + Ok(_) => trace!( + log, + "reprocessing light client update sent"; + ), + Err(_) => error!( + log, + "Failed to send scheduled light client update"; + ), + } + } else { + // There is a mismatch between the light client update ids registered for this + // root and the queued light client updates. This should never happen. + error!( + log, + "Unknown queued light client update for parent root"; + "parent_root" => ?parent_root, + "lc_id" => ?lc_id, + ); + } + } + } } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { @@ -591,6 +738,38 @@ impl ReprocessQueue { } } } + InboundEvent::ReadyLightClientUpdate(queued_id) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES, + ); + + if let Some((parent_root, work)) = self.queued_lc_updates.remove(&queued_id).map( + |(queued_lc_update, _delay_key)| { + ( + queued_lc_update.parent_root, + ReadyWork::LightClientUpdate(queued_lc_update), + ) + }, + ) { + if self.ready_work_tx.try_send(work).is_err() { + error!( + log, + "Failed to send scheduled light client optimistic update"; + ); + } + + if let Some(queued_lc_updates) = self + .awaiting_lc_updates_per_parent_root + .get_mut(&parent_root) + { + if let Some(index) = + queued_lc_updates.iter().position(|&id| id == queued_id) + { + queued_lc_updates.swap_remove(index); + } + } + } + } } metrics::set_gauge_vec( @@ -608,5 +787,10 @@ impl ReprocessQueue { &[ATTESTATIONS], self.attestations_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[LIGHT_CLIENT_UPDATES], + self.lc_updates_delay_queue.len() as i64, + ); } } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index ef23f6761f..3601ccb195 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -28,7 +28,8 @@ use types::{ use super::{ super::work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedUnaggregate, ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, + ReprocessQueueMessage, }, Worker, }; @@ -715,6 +716,10 @@ impl Worker { &metrics::BEACON_BLOCK_GOSSIP_SLOT_START_DELAY_TIME, block_delay, ); + metrics::set_gauge( + &metrics::BEACON_BLOCK_LAST_DELAY, + block_delay.as_millis() as i64, + ); let verification_result = self .chain @@ -949,7 +954,10 @@ impl Worker { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); if reprocess_tx - .try_send(ReprocessQueueMessage::BlockImported(block_root)) + .try_send(ReprocessQueueMessage::BlockImported { + block_root, + parent_root: block.message().parent_root(), + }) .is_err() { error!( @@ -1326,7 +1334,7 @@ impl Worker { LightClientFinalityUpdateError::InvalidLightClientFinalityUpdate => { debug!( self.log, - "LC invalid finality update"; + "Light client invalid finality update"; "peer" => %peer_id, "error" => ?e, ); @@ -1340,7 +1348,7 @@ impl Worker { LightClientFinalityUpdateError::TooEarly => { debug!( self.log, - "LC finality update too early"; + "Light client finality update too early"; "peer" => %peer_id, "error" => ?e, ); @@ -1353,7 +1361,7 @@ impl Worker { } LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!( self.log, - "LC finality update already seen"; + "Light client finality update already seen"; "peer" => %peer_id, "error" => ?e, ), @@ -1362,7 +1370,7 @@ impl Worker { | LightClientFinalityUpdateError::SigSlotStartIsNone | LightClientFinalityUpdateError::FailedConstructingUpdate => debug!( self.log, - "LC error constructing finality update"; + "Light client error constructing finality update"; "peer" => %peer_id, "error" => ?e, ), @@ -1377,22 +1385,77 @@ impl Worker { message_id: MessageId, peer_id: PeerId, light_client_optimistic_update: LightClientOptimisticUpdate, + reprocess_tx: Option>>, seen_timestamp: Duration, ) { - match self - .chain - .verify_optimistic_update_for_gossip(light_client_optimistic_update, seen_timestamp) - { - Ok(_verified_light_client_optimistic_update) => { + match self.chain.verify_optimistic_update_for_gossip( + light_client_optimistic_update.clone(), + seen_timestamp, + ) { + Ok(verified_light_client_optimistic_update) => { + debug!( + self.log, + "Light client successful optimistic update"; + "peer" => %peer_id, + "parent_root" => %verified_light_client_optimistic_update.parent_root, + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); } Err(e) => { - metrics::register_optimistic_update_error(&e); match e { - LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => { + LightClientOptimisticUpdateError::UnknownBlockParentRoot(parent_root) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES, + ); debug!( self.log, - "LC invalid optimistic update"; + "Optimistic update for unknown block"; + "peer_id" => %peer_id, + "parent_root" => ?parent_root + ); + + if let Some(sender) = reprocess_tx { + let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate( + QueuedLightClientUpdate { + peer_id, + message_id, + light_client_optimistic_update: Box::new( + light_client_optimistic_update, + ), + parent_root, + seen_timestamp, + }, + ); + + if sender.try_send(msg).is_err() { + error!( + self.log, + "Failed to send optimistic update for re-processing"; + ) + } + } else { + debug!( + self.log, + "Not sending light client update because it had been reprocessed"; + "peer_id" => %peer_id, + "parent_root" => ?parent_root + ); + + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + return; + } + LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => { + metrics::register_optimistic_update_error(&e); + + debug!( + self.log, + "Light client invalid optimistic update"; "peer" => %peer_id, "error" => ?e, ); @@ -1404,9 +1467,10 @@ impl Worker { ) } LightClientOptimisticUpdateError::TooEarly => { + metrics::register_optimistic_update_error(&e); debug!( self.log, - "LC optimistic update too early"; + "Light client optimistic update too early"; "peer" => %peer_id, "error" => ?e, ); @@ -1417,21 +1481,29 @@ impl Worker { "light_client_gossip_error", ); } - LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => debug!( - self.log, - "LC optimistic update already seen"; - "peer" => %peer_id, - "error" => ?e, - ), + LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => { + metrics::register_optimistic_update_error(&e); + + debug!( + self.log, + "Light client optimistic update already seen"; + "peer" => %peer_id, + "error" => ?e, + ) + } LightClientOptimisticUpdateError::BeaconChainError(_) | LightClientOptimisticUpdateError::LightClientUpdateError(_) | LightClientOptimisticUpdateError::SigSlotStartIsNone - | LightClientOptimisticUpdateError::FailedConstructingUpdate => debug!( - self.log, - "LC error constructing optimistic update"; - "peer" => %peer_id, - "error" => ?e, - ), + | LightClientOptimisticUpdateError::FailedConstructingUpdate => { + metrics::register_optimistic_update_error(&e); + + debug!( + self.log, + "Light client error constructing optimistic update"; + "peer" => %peer_id, + "error" => ?e, + ) + } } self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 1ec045e97e..6e6e681550 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -84,6 +84,7 @@ impl Worker { } }; let slot = block.slot(); + let parent_root = block.message().parent_root(); let result = self .chain .process_block( @@ -101,7 +102,10 @@ impl Worker { info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); // Trigger processing for work referencing this block. - let reprocess_msg = ReprocessQueueMessage::BlockImported(hash); + let reprocess_msg = ReprocessQueueMessage::BlockImported { + block_root: hash, + parent_root, + }; if reprocess_tx.try_send(reprocess_msg).is_err() { error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash) }; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index b4f3f29f93..8dc76877a1 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -335,10 +335,18 @@ lazy_static! { pub static ref BEACON_BLOCK_GOSSIP_SLOT_START_DELAY_TIME: Result = try_create_histogram_with_buckets( "beacon_block_gossip_slot_start_delay_time", "Duration between when the block is received and the start of the slot it belongs to.", + // Create a custom bucket list for greater granularity in block delay + Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) + // NOTE: Previous values, which we may want to switch back to. // [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50] - decimal_buckets(-1,2) + //decimal_buckets(-1,2) ); + pub static ref BEACON_BLOCK_LAST_DELAY: Result = try_create_int_gauge( + "beacon_block_last_delay", + "Keeps track of the last block's delay from the start of the slot" + ); + pub static ref BEACON_BLOCK_GOSSIP_ARRIVED_LATE_TOTAL: Result = try_create_int_counter( "beacon_block_gossip_arrived_late_total", "Count of times when a gossip block arrived from the network later than the attestation deadline.", @@ -362,6 +370,21 @@ lazy_static! { "Number of queued attestations where as matching block has been imported." ); + /* + * Light client update reprocessing queue metrics. + */ + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_expired_optimistic_updates", + "Number of queued light client optimistic updates which have expired before a matching block has been found." + ); + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_matched_optimistic_updates", + "Number of queued light client optimistic updates where as matching block has been imported." + ); + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_sent_optimistic_updates", + "Number of queued light client optimistic updates where as matching block has been imported." + ); } pub fn update_bandwidth_metrics(bandwidth: Arc) { diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 99d5facfe7..db0e9a8ac6 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -8,7 +8,7 @@ mod persistence; mod reward_cache; mod sync_aggregate_id; -pub use attestation::AttMaxCover; +pub use attestation::{earliest_attestation_validators, AttMaxCover}; pub use attestation_storage::{AttestationRef, SplitAttestation}; pub use max_cover::MaxCover; pub use persistence::{ diff --git a/book/src/checkpoint-sync.md b/book/src/checkpoint-sync.md index 893c545cb9..47dc03b20c 100644 --- a/book/src/checkpoint-sync.md +++ b/book/src/checkpoint-sync.md @@ -97,7 +97,7 @@ You can opt-in to reconstructing all of the historic states by providing the The database keeps track of three markers to determine the availability of historic blocks and states: -* `oldest_block_slot`: All blocks with slots less than or equal to this value are available in the +* `oldest_block_slot`: All blocks with slots greater than or equal to this value are available in the database. Additionally, the genesis block is always available. * `state_lower_limit`: All states with slots _less than or equal to_ this value are available in the database. The minimum value is 0, indicating that the genesis state is always available. diff --git a/common/compare_fields/src/lib.rs b/common/compare_fields/src/lib.rs index a0166eb500..bc2f5446ad 100644 --- a/common/compare_fields/src/lib.rs +++ b/common/compare_fields/src/lib.rs @@ -115,11 +115,7 @@ impl Comparison { let mut children = vec![]; for i in 0..std::cmp::max(a.len(), b.len()) { - children.push(FieldComparison::new( - format!("{:}", i), - &a.get(i), - &b.get(i), - )); + children.push(FieldComparison::new(format!("{i}"), &a.get(i), &b.get(i))); } Self::parent(field_name, a == b, children) @@ -164,8 +160,8 @@ impl FieldComparison { Self { field_name, equal: a == b, - a: format!("{:?}", a), - b: format!("{:?}", b), + a: format!("{a:?}"), + b: format!("{b:?}"), } } diff --git a/common/compare_fields_derive/src/lib.rs b/common/compare_fields_derive/src/lib.rs index beabc6ca9b..752c09ee05 100644 --- a/common/compare_fields_derive/src/lib.rs +++ b/common/compare_fields_derive/src/lib.rs @@ -32,7 +32,7 @@ pub fn compare_fields_derive(input: TokenStream) -> TokenStream { _ => panic!("compare_fields_derive only supports named struct fields."), }; - let field_name = format!("{:}", ident_a); + let field_name = ident_a.to_string(); let ident_b = ident_a.clone(); let quote = if is_slice(field) { diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 58b4c88b3c..653c6c0bcc 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1026,6 +1026,58 @@ impl BeaconNodeHttpClient { .transpose() } + /// `POST beacon/rewards/sync_committee` + pub async fn post_beacon_rewards_sync_committee( + &self, + rewards: &[Option>], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("rewards") + .push("sync_committee"); + + self.post(path, &rewards).await?; + + Ok(()) + } + + /// `GET beacon/rewards/blocks` + pub async fn get_beacon_rewards_blocks(&self, epoch: Epoch) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("rewards") + .push("blocks"); + + path.query_pairs_mut() + .append_pair("epoch", &epoch.to_string()); + + self.get(path).await + } + + /// `POST beacon/rewards/attestations` + pub async fn post_beacon_rewards_attestations( + &self, + attestations: &[ValidatorId], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("rewards") + .push("attestations"); + + self.post(path, &attestations).await?; + + Ok(()) + } + /// `POST validator/contribution_and_proofs` pub async fn post_validator_contribution_and_proofs( &self, diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 2dced1c449..e50d9f4dc0 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -1,8 +1,11 @@ //! This module contains endpoints that are non-standard and only available on Lighthouse servers. mod attestation_performance; +pub mod attestation_rewards; mod block_packing_efficiency; mod block_rewards; +mod standard_block_rewards; +mod sync_committee_rewards; use crate::{ ok_or_error, @@ -22,11 +25,14 @@ use store::{AnchorInfo, Split, StoreConfig}; pub use attestation_performance::{ AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, }; +pub use attestation_rewards::StandardAttestationRewards; pub use block_packing_efficiency::{ BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, }; pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use lighthouse_network::{types::SyncState, PeerInfo}; +pub use standard_block_rewards::StandardBlockReward; +pub use sync_committee_rewards::SyncCommitteeReward; // Define "legacy" implementations of `Option` which use four bytes for encoding the union // selector. diff --git a/common/eth2/src/lighthouse/attestation_rewards.rs b/common/eth2/src/lighthouse/attestation_rewards.rs new file mode 100644 index 0000000000..3fd59782c8 --- /dev/null +++ b/common/eth2/src/lighthouse/attestation_rewards.rs @@ -0,0 +1,42 @@ +use serde::{Deserialize, Serialize}; + +// Details about the rewards paid for attestations +// All rewards in GWei + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct IdealAttestationRewards { + // Validator's effective balance in gwei + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub effective_balance: u64, + // Ideal attester's reward for head vote in gwei + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub head: u64, + // Ideal attester's reward for target vote in gwei + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub target: u64, + // Ideal attester's reward for source vote in gwei + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub source: u64, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct TotalAttestationRewards { + // one entry for every validator based on their attestations in the epoch + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub validator_index: u64, + // attester's reward for head vote in gwei + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub head: u64, + // attester's reward for target vote in gwei + pub target: i64, + // attester's reward for source vote in gwei + pub source: i64, + // TBD attester's inclusion_delay reward in gwei (phase0 only) + // pub inclusion_delay: u64, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct StandardAttestationRewards { + pub ideal_rewards: Vec, + pub total_rewards: Vec, +} diff --git a/common/eth2/src/lighthouse/standard_block_rewards.rs b/common/eth2/src/lighthouse/standard_block_rewards.rs new file mode 100644 index 0000000000..502577500d --- /dev/null +++ b/common/eth2/src/lighthouse/standard_block_rewards.rs @@ -0,0 +1,26 @@ +use serde::{Deserialize, Serialize}; + +// Details about the rewards for a single block +// All rewards in GWei +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct StandardBlockReward { + // proposer of the block, the proposer index who receives these rewards + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub proposer_index: u64, + // total block reward in gwei, + // equal to attestations + sync_aggregate + proposer_slashings + attester_slashings + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub total: u64, + // block reward component due to included attestations in gwei + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub attestations: u64, + // block reward component due to included sync_aggregate in gwei + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub sync_aggregate: u64, + // block reward component due to included proposer_slashings in gwei + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub proposer_slashings: u64, + // block reward component due to included attester_slashings in gwei + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub attester_slashings: u64, +} diff --git a/common/eth2/src/lighthouse/sync_committee_rewards.rs b/common/eth2/src/lighthouse/sync_committee_rewards.rs new file mode 100644 index 0000000000..cdd6850650 --- /dev/null +++ b/common/eth2/src/lighthouse/sync_committee_rewards.rs @@ -0,0 +1,12 @@ +use serde::{Deserialize, Serialize}; + +// Details about the rewards paid to sync committee members for attesting headers +// All rewards in GWei + +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct SyncCommitteeReward { + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub validator_index: u64, + // sync committee reward in gwei for the validator + pub reward: i64, +} diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 213bb1eb02..44147135c1 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -270,11 +270,20 @@ pub struct FinalityCheckpointsData { } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(try_from = "&str")] pub enum ValidatorId { PublicKey(PublicKeyBytes), Index(u64), } +impl TryFrom<&str> for ValidatorId { + type Error = String; + + fn try_from(s: &str) -> Result { + Self::from_str(s) + } +} + impl FromStr for ValidatorId { type Err = String; diff --git a/common/malloc_utils/src/lib.rs b/common/malloc_utils/src/lib.rs index 1eab8b3baa..3bb242369f 100644 --- a/common/malloc_utils/src/lib.rs +++ b/common/malloc_utils/src/lib.rs @@ -2,18 +2,18 @@ //! //! ## Conditional Compilation //! -//! Presently, only configuration for "The GNU Allocator" from `glibc` is supported. All other -//! allocators are ignored. +//! This crate can be compiled with different feature flags to support different allocators: //! -//! It is assumed that if the following two statements are correct then we should expect to -//! configure `glibc`: +//! - Jemalloc, via the `jemalloc` feature. +//! - GNU malloc, if no features are set and the system supports it. +//! - The system allocator, if no features are set and the allocator is not GNU malloc. +//! +//! It is assumed that if Jemalloc is not in use, and the following two statements are correct then +//! we should expect to configure `glibc`: //! //! - `target_os = linux` //! - `target_env != musl` //! -//! In all other cases this library will not attempt to do anything (i.e., all functions are -//! no-ops). -//! //! If the above conditions are fulfilled but `glibc` still isn't present at runtime then a panic //! may be triggered. It is understood that there's no way to be certain that a compatible `glibc` //! is present: https://github.com/rust-lang/rust/issues/33244. diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index 83ebc15cb0..7521067aee 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -9,9 +9,9 @@ use safe_arith::SafeArith; use std::sync::Arc; use types::consts::altair::{PARTICIPATION_FLAG_WEIGHTS, PROPOSER_WEIGHT, WEIGHT_DENOMINATOR}; -pub fn process_operations<'a, T: EthSpec, Payload: ExecPayload>( +pub fn process_operations>( state: &mut BeaconState, - block_body: BeaconBlockBodyRef<'a, T, Payload>, + block_body: BeaconBlockBodyRef, verify_signatures: VerifySignatures, ctxt: &mut ConsensusContext, spec: &ChainSpec, @@ -230,9 +230,9 @@ pub fn process_attester_slashings( } /// Wrapper function to handle calling the correct version of `process_attestations` based on /// the fork. -pub fn process_attestations<'a, T: EthSpec, Payload: ExecPayload>( +pub fn process_attestations>( state: &mut BeaconState, - block_body: BeaconBlockBodyRef<'a, T, Payload>, + block_body: BeaconBlockBodyRef, verify_signatures: VerifySignatures, ctxt: &mut ConsensusContext, spec: &ChainSpec, diff --git a/testing/antithesis/Dockerfile.libvoidstar b/testing/antithesis/Dockerfile.libvoidstar index 32e2d5648d..bae1807329 100644 --- a/testing/antithesis/Dockerfile.libvoidstar +++ b/testing/antithesis/Dockerfile.libvoidstar @@ -1,11 +1,9 @@ -FROM rust:1.62.1-bullseye AS builder -RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev +FROM rust:1.66.1-bullseye AS builder +RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev protobuf-compiler COPY . lighthouse # Build lighthouse directly with a cargo build command, bypassing the Makefile. -# We have to use nightly in order to disable the new LLVM pass manager. -RUN rustup default nightly-2022-07-26 && cd lighthouse && LD_LIBRARY_PATH=/lighthouse/testing/antithesis/libvoidstar/ RUSTFLAGS="-Znew-llvm-pass-manager=no -Cpasses=sancov -Cllvm-args=-sanitizer-coverage-level=3 -Cllvm-args=-sanitizer-coverage-trace-pc-guard -Ccodegen-units=1 -Cdebuginfo=2 -L/lighthouse/testing/antithesis/libvoidstar/ -lvoidstar" cargo build --release --manifest-path lighthouse/Cargo.toml --target x86_64-unknown-linux-gnu --features modern --verbose --bin lighthouse - +RUN cd lighthouse && LD_LIBRARY_PATH=/lighthouse/testing/antithesis/libvoidstar/ RUSTFLAGS="-Cpasses=sancov-module -Cllvm-args=-sanitizer-coverage-level=3 -Cllvm-args=-sanitizer-coverage-trace-pc-guard -Ccodegen-units=1 -Cdebuginfo=2 -L/lighthouse/testing/antithesis/libvoidstar/ -lvoidstar" cargo build --release --manifest-path lighthouse/Cargo.toml --target x86_64-unknown-linux-gnu --features modern --verbose --bin lighthouse # build lcli binary directly with cargo install command, bypassing the makefile RUN cargo install --path /lighthouse/lcli --force --locked