diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000000..dac0163003 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,4 @@ +[env] +# Set the number of arenas to 16 when using jemalloc. +JEMALLOC_SYS_WITH_MALLOC_CONF = "abort_conf:true,narenas:16" + diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index a4e1fefe23..27f5dabedb 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -112,6 +112,7 @@ jobs: --platform=linux/${SHORT_ARCH} \ --file ./Dockerfile.cross . \ --tag ${IMAGE_NAME}:${VERSION}-${SHORT_ARCH}${VERSION_SUFFIX}${MODERNITY_SUFFIX} \ + --provenance=false \ --push build-docker-multiarch: name: build-docker-multiarch${{ matrix.modernity }} diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 8d52f7fa7e..57fee71830 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -306,16 +306,6 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Typecheck benchmark code without running it run: make check-benches - check-consensus: - name: check-consensus - runs-on: ubuntu-latest - needs: cargo-fmt - steps: - - uses: actions/checkout@v3 - - name: Get latest version of stable Rust - run: rustup update stable - - name: Typecheck consensus code in strict mode - run: make check-consensus clippy: name: clippy runs-on: ubuntu-latest @@ -382,14 +372,12 @@ jobs: - uses: actions/checkout@v3 - name: Install Rust (${{ env.PINNED_NIGHTLY }}) run: rustup toolchain install $PINNED_NIGHTLY - # NOTE: cargo-udeps version is pinned until this issue is resolved: - # https://github.com/est31/cargo-udeps/issues/135 - name: Install Protoc uses: arduino/setup-protoc@e52d9eb8f7b63115df1ac544a1376fdbf5a39612 with: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Install cargo-udeps - run: cargo install cargo-udeps --locked --force --version 0.1.30 + run: cargo install cargo-udeps --locked --force - name: Create Cargo config dir run: mkdir -p .cargo - name: Install custom Cargo config diff --git a/Cargo.lock b/Cargo.lock index 43fe52d74c..4046537ef6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2710,6 +2710,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "funty" version = "1.1.0" @@ -3223,6 +3229,7 @@ dependencies = [ "eth2_ssz", "execution_layer", "futures", + "genesis", "hex", "lazy_static", "lighthouse_metrics", @@ -3610,6 +3617,38 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" +[[package]] +name = "jemalloc-ctl" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1891c671f3db85d8ea8525dd43ab147f9977041911d24a03e5a36187a7bfde9" +dependencies = [ + "jemalloc-sys", + "libc", + "paste", +] + +[[package]] +name = "jemalloc-sys" +version = "0.5.2+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "134163979b6eed9564c98637b710b40979939ba351f59952708234ea11b5f3f8" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16c2514137880c52b0b4822b563fadd38257c1f380858addb74a400889696ea6" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "js-sys" version = "0.3.60" @@ -3739,6 +3778,7 @@ dependencies = [ "lighthouse_network", "lighthouse_version", "log", + "malloc_utils", "sensitive_url", "serde", "serde_json", @@ -4548,6 +4588,8 @@ dependencies = [ name = "malloc_utils" version = "0.1.0" dependencies = [ + "jemalloc-ctl", + "jemallocator", "lazy_static", "libc", "lighthouse_metrics", diff --git a/Cargo.toml b/Cargo.toml index 88f3e69a99..76edfbfe81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ members = [ "validator_client", "validator_client/slashing_protection", ] +resolver = "2" [patch] [patch.crates-io] diff --git a/Makefile b/Makefile index c81f43b0e0..e7628733cc 100644 --- a/Makefile +++ b/Makefile @@ -14,8 +14,16 @@ BUILD_PATH_AARCH64 = "target/$(AARCH64_TAG)/release" PINNED_NIGHTLY ?= nightly CLIPPY_PINNED_NIGHTLY=nightly-2022-05-19 +# List of features to use when building natively. Can be overriden via the environment. +# No jemalloc on Windows +ifeq ($(OS),Windows_NT) + FEATURES?= +else + FEATURES?=jemalloc +endif + # List of features to use when cross-compiling. Can be overridden via the environment. -CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx +CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx,jemalloc # Cargo profile for Cross builds. Default is for local builds, CI uses an override. CROSS_PROFILE ?= release @@ -104,10 +112,6 @@ cargo-fmt: check-benches: cargo check --workspace --benches -# Typechecks consensus code *without* allowing deprecated legacy arithmetic or metrics. -check-consensus: - cargo check -p state_processing --no-default-features - # Runs only the ef-test vectors. run-ef-tests: rm -rf $(EF_TESTS)/.accessed_file_log.txt diff --git a/README.md b/README.md index 859d5c4c63..3565882d6e 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ of the Lighthouse book. The best place for discussion is the [Lighthouse Discord server](https://discord.gg/cyAszAh). -Sign up to the [Lighthouse Development Updates](https://eepurl.com/dh9Lvb/) mailing list for email +Sign up to the [Lighthouse Development Updates](https://eepurl.com/dh9Lvb) mailing list for email notifications about releases, network status and other important information. Encrypt sensitive messages using our [PGP diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d82e0feb48..95d2b7922f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2319,32 +2319,74 @@ impl BeaconChain { } /// Verify a signed BLS to execution change before allowing it to propagate on the gossip network. - pub fn verify_bls_to_execution_change_for_gossip( + pub fn verify_bls_to_execution_change_for_http_api( &self, bls_to_execution_change: SignedBlsToExecutionChange, ) -> Result, Error> { - let current_fork = self.spec.fork_name_at_slot::(self.slot()?); - if let ForkName::Base | ForkName::Altair | ForkName::Merge = current_fork { - // Disallow BLS to execution changes prior to the Capella fork. - return Err(Error::BlsToExecutionChangeBadFork(current_fork)); + // Before checking the gossip duplicate filter, check that no prior change is already + // in our op pool. Ignore these messages: do not gossip, do not try to override the pool. + match self + .op_pool + .bls_to_execution_change_in_pool_equals(&bls_to_execution_change) + { + Some(true) => return Ok(ObservationOutcome::AlreadyKnown), + Some(false) => return Err(Error::BlsToExecutionConflictsWithPool), + None => (), } - let wall_clock_state = self.wall_clock_state()?; + // Use the head state to save advancing to the wall-clock slot unnecessarily. The message is + // signed with respect to the genesis fork version, and the slot check for gossip is applied + // separately. This `Arc` clone of the head is nice and cheap. + let head_snapshot = self.head().snapshot; + let head_state = &head_snapshot.beacon_state; Ok(self .observed_bls_to_execution_changes .lock() - .verify_and_observe(bls_to_execution_change, &wall_clock_state, &self.spec)?) + .verify_and_observe(bls_to_execution_change, head_state, &self.spec)?) + } + + /// Verify a signed BLS to execution change before allowing it to propagate on the gossip network. + pub fn verify_bls_to_execution_change_for_gossip( + &self, + bls_to_execution_change: SignedBlsToExecutionChange, + ) -> Result, Error> { + // Ignore BLS to execution changes on gossip prior to Capella. + if !self.current_slot_is_post_capella()? { + return Err(Error::BlsToExecutionPriorToCapella); + } + self.verify_bls_to_execution_change_for_http_api(bls_to_execution_change) + .or_else(|e| { + // On gossip treat conflicts the same as duplicates [IGNORE]. + match e { + Error::BlsToExecutionConflictsWithPool => Ok(ObservationOutcome::AlreadyKnown), + e => Err(e), + } + }) + } + + /// Check if the current slot is greater than or equal to the Capella fork epoch. + pub fn current_slot_is_post_capella(&self) -> Result { + let current_fork = self.spec.fork_name_at_slot::(self.slot()?); + if let ForkName::Base | ForkName::Altair | ForkName::Merge = current_fork { + Ok(false) + } else { + Ok(true) + } } /// Import a BLS to execution change to the op pool. + /// + /// Return `true` if the change was added to the pool. pub fn import_bls_to_execution_change( &self, bls_to_execution_change: SigVerifiedOp, - ) { + ) -> bool { if self.eth1_chain.is_some() { self.op_pool - .insert_bls_to_execution_change(bls_to_execution_change); + .insert_bls_to_execution_change(bls_to_execution_change) + } else { + false } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index fcb2c53a4a..1f727c3e17 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -162,6 +162,7 @@ pub enum BeaconChainError { BlockRewardSlotError, BlockRewardAttestationError, BlockRewardSyncError, + SyncCommitteeRewardsSyncError, HeadMissingFromForkChoice(Hash256), FinalizedBlockMissingFromForkChoice(Hash256), HeadBlockMissingFromForkChoice(Hash256), @@ -206,7 +207,8 @@ pub enum BeaconChainError { MissingPersistedForkChoice, CommitteePromiseFailed(oneshot_broadcast::Error), MaxCommitteePromises(usize), - BlsToExecutionChangeBadFork(ForkName), + BlsToExecutionPriorToCapella, + BlsToExecutionConflictsWithPool, InconsistentFork(InconsistentFork), ProposerHeadForkChoiceError(fork_choice::Error), BlobsUnavailable, diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 71449c76cb..e09cbdf310 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -43,6 +43,7 @@ pub mod schema_change; mod shuffling_cache; mod snapshot_cache; pub mod state_advance_timer; +pub mod sync_committee_rewards; pub mod sync_committee_verification; pub mod test_utils; mod timeout_rw_lock; diff --git a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs index ec9c90e735..20d7181808 100644 --- a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs @@ -2,6 +2,7 @@ use crate::{ beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes, }; use derivative::Derivative; +use eth2::types::Hash256; use slot_clock::SlotClock; use std::time::Duration; use strum::AsRefStr; @@ -36,6 +37,8 @@ pub enum Error { SigSlotStartIsNone, /// Failed to construct a LightClientOptimisticUpdate from state. FailedConstructingUpdate, + /// Unknown block with parent root. + UnknownBlockParentRoot(Hash256), /// Beacon chain error occured. BeaconChainError(BeaconChainError), LightClientUpdateError(LightClientUpdateError), @@ -58,6 +61,7 @@ impl From for Error { #[derivative(Clone(bound = "T: BeaconChainTypes"))] pub struct VerifiedLightClientOptimisticUpdate { light_client_optimistic_update: LightClientOptimisticUpdate, + pub parent_root: Hash256, seen_timestamp: Duration, } @@ -107,6 +111,16 @@ impl VerifiedLightClientOptimisticUpdate { None => return Err(Error::SigSlotStartIsNone), } + // check if we can process the optimistic update immediately + // otherwise queue + let canonical_root = light_client_optimistic_update + .attested_header + .canonical_root(); + + if canonical_root != head_block.message().parent_root() { + return Err(Error::UnknownBlockParentRoot(canonical_root)); + } + let optimistic_update = LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?; @@ -119,6 +133,7 @@ impl VerifiedLightClientOptimisticUpdate { Ok(Self { light_client_optimistic_update, + parent_root: canonical_root, seen_timestamp, }) } diff --git a/beacon_node/beacon_chain/src/sync_committee_rewards.rs b/beacon_node/beacon_chain/src/sync_committee_rewards.rs new file mode 100644 index 0000000000..2221aa1d5e --- /dev/null +++ b/beacon_node/beacon_chain/src/sync_committee_rewards.rs @@ -0,0 +1,87 @@ +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; + +use eth2::lighthouse::SyncCommitteeReward; +use safe_arith::SafeArith; +use slog::error; +use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards; +use std::collections::HashMap; +use store::RelativeEpoch; +use types::{AbstractExecPayload, BeaconBlockRef, BeaconState}; + +impl BeaconChain { + pub fn compute_sync_committee_rewards>( + &self, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, + state: &mut BeaconState, + ) -> Result, BeaconChainError> { + if block.slot() != state.slot() { + return Err(BeaconChainError::BlockRewardSlotError); + } + + let spec = &self.spec; + + state.build_committee_cache(RelativeEpoch::Current, spec)?; + + let sync_aggregate = block.body().sync_aggregate()?; + + let sync_committee = state.current_sync_committee()?.clone(); + + let sync_committee_indices = state.get_sync_committee_indices(&sync_committee)?; + + let (participant_reward_value, proposer_reward_per_bit) = + compute_sync_aggregate_rewards(state, spec).map_err(|e| { + error!( + self.log, "Error calculating sync aggregate rewards"; + "error" => ?e + ); + BeaconChainError::SyncCommitteeRewardsSyncError + })?; + + let mut balances = HashMap::::new(); + + let mut total_proposer_rewards = 0; + let proposer_index = state.get_beacon_proposer_index(block.slot(), spec)?; + + // Apply rewards to participant balances. Keep track of proposer rewards + for (validator_index, participant_bit) in sync_committee_indices + .iter() + .zip(sync_aggregate.sync_committee_bits.iter()) + { + let participant_balance = balances + .entry(*validator_index) + .or_insert_with(|| state.balances()[*validator_index]); + + if participant_bit { + participant_balance.safe_add_assign(participant_reward_value)?; + + balances + .entry(proposer_index) + .or_insert_with(|| state.balances()[proposer_index]) + .safe_add_assign(proposer_reward_per_bit)?; + + total_proposer_rewards.safe_add_assign(proposer_reward_per_bit)?; + } else { + *participant_balance = participant_balance.saturating_sub(participant_reward_value); + } + } + + Ok(balances + .iter() + .filter_map(|(i, new_balance)| { + let reward = if *i != proposer_index { + *new_balance as i64 - state.balances()[*i] as i64 + } else if sync_committee_indices.contains(i) { + *new_balance as i64 + - state.balances()[*i] as i64 + - total_proposer_rewards as i64 + } else { + return None; + }; + Some(SyncCommitteeReward { + validator_index: *i as u64, + reward, + }) + }) + .collect()) + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index f75f911d57..37cf488413 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2,6 +2,7 @@ pub use crate::persisted_beacon_chain::PersistedBeaconChain; pub use crate::{ beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}, migrate::MigratorConfig, + sync_committee_verification::Error as SyncCommitteeError, validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification, }; @@ -22,7 +23,7 @@ use execution_layer::{ }; use fork_choice::CountUnrealized; use futures::channel::mpsc::Receiver; -pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; +pub use genesis::{interop_genesis_state_with_eth1, DEFAULT_ETH1_BLOCK_HASH}; use int_to_bytes::int_to_bytes32; use kzg::TrustedSetup; use merkle_proof::MerkleTree; @@ -149,6 +150,7 @@ pub struct Builder { eth_spec_instance: T::EthSpec, spec: Option, validator_keypairs: Option>, + withdrawal_keypairs: Vec>, chain_config: Option, store_config: Option, #[allow(clippy::type_complexity)] @@ -180,7 +182,7 @@ impl Builder> { .unwrap(), ); let mutator = move |builder: BeaconChainBuilder<_>| { - let genesis_state = interop_genesis_state::( + let genesis_state = interop_genesis_state_with_eth1::( &validator_keypairs, HARNESS_GENESIS_TIME, Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH), @@ -241,7 +243,7 @@ impl Builder> { .expect("cannot build without validator keypairs"); let mutator = move |builder: BeaconChainBuilder<_>| { - let genesis_state = interop_genesis_state::( + let genesis_state = interop_genesis_state_with_eth1::( &validator_keypairs, HARNESS_GENESIS_TIME, Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH), @@ -283,6 +285,7 @@ where eth_spec_instance, spec: None, validator_keypairs: None, + withdrawal_keypairs: vec![], chain_config: None, store_config: None, store: None, @@ -308,6 +311,11 @@ where self } + pub fn withdrawal_keypairs(mut self, withdrawal_keypairs: Vec>) -> Self { + self.withdrawal_keypairs = withdrawal_keypairs; + self + } + pub fn default_spec(self) -> Self { self.spec_or_default(None) } @@ -545,6 +553,7 @@ where spec: chain.spec.clone(), chain: Arc::new(chain), validator_keypairs, + withdrawal_keypairs: self.withdrawal_keypairs, shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)), runtime: self.runtime, mock_execution_layer: self.mock_execution_layer, @@ -560,6 +569,12 @@ where /// Used for testing. pub struct BeaconChainHarness { pub validator_keypairs: Vec, + /// Optional BLS withdrawal keys for each validator. + /// + /// If a validator index is missing from this vec or their entry is `None` then either + /// no BLS withdrawal key was set for them (they had an address from genesis) or the test + /// initializer neglected to set this field. + pub withdrawal_keypairs: Vec>, pub chain: Arc>, pub spec: ChainSpec, @@ -1471,6 +1486,44 @@ where .sign(sk, &fork, genesis_validators_root, &self.chain.spec) } + pub fn make_bls_to_execution_change( + &self, + validator_index: u64, + address: Address, + ) -> SignedBlsToExecutionChange { + let keypair = self.get_withdrawal_keypair(validator_index); + self.make_bls_to_execution_change_with_keys( + validator_index, + address, + &keypair.pk, + &keypair.sk, + ) + } + + pub fn make_bls_to_execution_change_with_keys( + &self, + validator_index: u64, + address: Address, + pubkey: &PublicKey, + secret_key: &SecretKey, + ) -> SignedBlsToExecutionChange { + let genesis_validators_root = self.chain.genesis_validators_root; + BlsToExecutionChange { + validator_index, + from_bls_pubkey: pubkey.compress(), + to_execution_address: address, + } + .sign(secret_key, genesis_validators_root, &self.chain.spec) + } + + pub fn get_withdrawal_keypair(&self, validator_index: u64) -> &Keypair { + self.withdrawal_keypairs + .get(validator_index as usize) + .expect("BLS withdrawal key missing from harness") + .as_ref() + .expect("no withdrawal key for validator") + } + pub fn add_voluntary_exit( &self, block: &mut BeaconBlock, @@ -2021,6 +2074,30 @@ where (honest_head, faulty_head) } + + pub fn process_sync_contributions( + &self, + sync_contributions: HarnessSyncContributions, + ) -> Result<(), SyncCommitteeError> { + let mut verified_contributions = Vec::with_capacity(sync_contributions.len()); + + for (_, contribution_and_proof) in sync_contributions { + let signed_contribution_and_proof = contribution_and_proof.unwrap(); + + let verified_contribution = self + .chain + .verify_sync_contribution_for_gossip(signed_contribution_and_proof)?; + + verified_contributions.push(verified_contribution); + } + + for verified_contribution in verified_contributions { + self.chain + .add_contribution_to_block_inclusion_pool(verified_contribution)?; + } + + Ok(()) + } } // Junk `Debug` impl to satistfy certain trait bounds during testing. diff --git a/beacon_node/beacon_chain/tests/main.rs b/beacon_node/beacon_chain/tests/main.rs index 3b8c83594b..c81a547406 100644 --- a/beacon_node/beacon_chain/tests/main.rs +++ b/beacon_node/beacon_chain/tests/main.rs @@ -5,6 +5,7 @@ mod capella; mod merge; mod op_verification; mod payload_invalidation; +mod rewards; mod store_tests; mod sync_committee_verification; mod tests; diff --git a/beacon_node/beacon_chain/tests/rewards.rs b/beacon_node/beacon_chain/tests/rewards.rs new file mode 100644 index 0000000000..b61bea1242 --- /dev/null +++ b/beacon_node/beacon_chain/tests/rewards.rs @@ -0,0 +1,121 @@ +#![cfg(test)] + +use std::collections::HashMap; + +use beacon_chain::test_utils::{ + generate_deterministic_keypairs, BeaconChainHarness, EphemeralHarnessType, +}; +use beacon_chain::{ + test_utils::{AttestationStrategy, BlockStrategy, RelativeSyncCommittee}, + types::{Epoch, EthSpec, Keypair, MinimalEthSpec}, +}; +use lazy_static::lazy_static; + +pub const VALIDATOR_COUNT: usize = 64; + +lazy_static! { + static ref KEYPAIRS: Vec = generate_deterministic_keypairs(VALIDATOR_COUNT); +} + +fn get_harness() -> BeaconChainHarness> { + let mut spec = E::default_spec(); + + spec.altair_fork_epoch = Some(Epoch::new(0)); // We use altair for all tests + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .keypairs(KEYPAIRS.to_vec()) + .fresh_ephemeral_store() + .build(); + + harness.advance_slot(); + + harness +} + +#[tokio::test] +async fn test_sync_committee_rewards() { + let num_block_produced = MinimalEthSpec::slots_per_epoch(); + let harness = get_harness::(); + + let latest_block_root = harness + .extend_chain( + num_block_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Create and add sync committee message to op_pool + let sync_contributions = harness.make_sync_contributions( + &harness.get_current_state(), + latest_block_root, + harness.get_current_slot(), + RelativeSyncCommittee::Current, + ); + + harness + .process_sync_contributions(sync_contributions) + .unwrap(); + + // Add block + let chain = &harness.chain; + let (head_state, head_state_root) = harness.get_current_state_and_root(); + let target_slot = harness.get_current_slot() + 1; + + let (block_root, mut state) = harness + .add_attested_block_at_slot(target_slot, head_state, head_state_root, &[]) + .await + .unwrap(); + + let block = harness.get_block(block_root).unwrap(); + let parent_block = chain + .get_blinded_block(&block.parent_root()) + .unwrap() + .unwrap(); + let parent_state = chain + .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .unwrap() + .unwrap(); + + let reward_payload = chain + .compute_sync_committee_rewards(block.message(), &mut state) + .unwrap(); + + let rewards = reward_payload + .iter() + .map(|reward| (reward.validator_index, reward.reward)) + .collect::>(); + + let proposer_index = state + .get_beacon_proposer_index(target_slot, &MinimalEthSpec::default_spec()) + .unwrap(); + + let mut mismatches = vec![]; + + for validator in state.validators() { + let validator_index = state + .clone() + .get_validator_index(&validator.pubkey) + .unwrap() + .unwrap(); + let pre_state_balance = parent_state.balances()[validator_index]; + let post_state_balance = state.balances()[validator_index]; + let sync_committee_reward = rewards.get(&(validator_index as u64)).unwrap_or(&0); + + if validator_index == proposer_index { + continue; // Ignore proposer + } + + if pre_state_balance as i64 + *sync_committee_reward != post_state_balance as i64 { + mismatches.push(validator_index.to_string()); + } + } + + assert_eq!( + mismatches.len(), + 0, + "Expect 0 mismatches, but these validators have mismatches on balance: {} ", + mismatches.join(",") + ); +} diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 8a6ea9cfe1..622ea7aecd 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1013,8 +1013,8 @@ fn check_shuffling_compatible( // Ensure blocks from abandoned forks are pruned from the Hot DB #[tokio::test] async fn prunes_abandoned_fork_between_two_finalized_checkpoints() { - const HONEST_VALIDATOR_COUNT: usize = 16 + 0; - const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0; + const HONEST_VALIDATOR_COUNT: usize = 32 + 0; + const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0; const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT; let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); @@ -1123,8 +1123,8 @@ async fn prunes_abandoned_fork_between_two_finalized_checkpoints() { #[tokio::test] async fn pruning_does_not_touch_abandoned_block_shared_with_canonical_chain() { - const HONEST_VALIDATOR_COUNT: usize = 16 + 0; - const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0; + const HONEST_VALIDATOR_COUNT: usize = 32 + 0; + const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0; const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT; let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); @@ -1255,8 +1255,8 @@ async fn pruning_does_not_touch_abandoned_block_shared_with_canonical_chain() { #[tokio::test] async fn pruning_does_not_touch_blocks_prior_to_finalization() { - const HONEST_VALIDATOR_COUNT: usize = 16; - const ADVERSARIAL_VALIDATOR_COUNT: usize = 8; + const HONEST_VALIDATOR_COUNT: usize = 32; + const ADVERSARIAL_VALIDATOR_COUNT: usize = 16; const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT; let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); @@ -1350,8 +1350,8 @@ async fn pruning_does_not_touch_blocks_prior_to_finalization() { #[tokio::test] async fn prunes_fork_growing_past_youngest_finalized_checkpoint() { - const HONEST_VALIDATOR_COUNT: usize = 16 + 0; - const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0; + const HONEST_VALIDATOR_COUNT: usize = 32 + 0; + const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0; const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT; let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); @@ -1495,8 +1495,8 @@ async fn prunes_fork_growing_past_youngest_finalized_checkpoint() { // This is to check if state outside of normal block processing are pruned correctly. #[tokio::test] async fn prunes_skipped_slots_states() { - const HONEST_VALIDATOR_COUNT: usize = 16 + 0; - const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0; + const HONEST_VALIDATOR_COUNT: usize = 32 + 0; + const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0; const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT; let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); @@ -1624,8 +1624,8 @@ async fn prunes_skipped_slots_states() { // This is to check if state outside of normal block processing are pruned correctly. #[tokio::test] async fn finalizes_non_epoch_start_slot() { - const HONEST_VALIDATOR_COUNT: usize = 16 + 0; - const ADVERSARIAL_VALIDATOR_COUNT: usize = 8 - 0; + const HONEST_VALIDATOR_COUNT: usize = 32 + 0; + const ADVERSARIAL_VALIDATOR_COUNT: usize = 16 - 0; const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT; let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); diff --git a/beacon_node/beacon_chain/tests/sync_committee_verification.rs b/beacon_node/beacon_chain/tests/sync_committee_verification.rs index 1e51b0ffb9..239f55e7d3 100644 --- a/beacon_node/beacon_chain/tests/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/tests/sync_committee_verification.rs @@ -45,6 +45,7 @@ fn get_valid_sync_committee_message( harness: &BeaconChainHarness>, slot: Slot, relative_sync_committee: RelativeSyncCommittee, + message_index: usize, ) -> (SyncCommitteeMessage, usize, SecretKey, SyncSubnetId) { let head_state = harness.chain.head_beacon_state_cloned(); let head_block_root = harness.chain.head_snapshot().beacon_block_root; @@ -52,7 +53,7 @@ fn get_valid_sync_committee_message( .make_sync_committee_messages(&head_state, head_block_root, slot, relative_sync_committee) .get(0) .expect("sync messages should exist") - .get(0) + .get(message_index) .expect("first sync message should exist") .clone(); @@ -494,7 +495,7 @@ async fn unaggregated_gossip_verification() { let current_slot = harness.chain.slot().expect("should get slot"); let (valid_sync_committee_message, expected_validator_index, validator_sk, subnet_id) = - get_valid_sync_committee_message(&harness, current_slot, RelativeSyncCommittee::Current); + get_valid_sync_committee_message(&harness, current_slot, RelativeSyncCommittee::Current, 0); macro_rules! assert_invalid { ($desc: tt, $attn_getter: expr, $subnet_getter: expr, $($error: pat_param) |+ $( if $guard: expr )?) => { @@ -644,7 +645,7 @@ async fn unaggregated_gossip_verification() { // **Incorrectly** create a sync message using the current sync committee let (next_valid_sync_committee_message, _, _, next_subnet_id) = - get_valid_sync_committee_message(&harness, target_slot, RelativeSyncCommittee::Current); + get_valid_sync_committee_message(&harness, target_slot, RelativeSyncCommittee::Current, 1); assert_invalid!( "sync message on incorrect subnet", diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index d80db132ef..384fcbe5db 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -19,7 +19,7 @@ use types::{ }; // Should ideally be divisible by 3. -pub const VALIDATOR_COUNT: usize = 24; +pub const VALIDATOR_COUNT: usize = 48; lazy_static! { /// A cached set of keys. diff --git a/beacon_node/genesis/src/interop.rs b/beacon_node/genesis/src/interop.rs index d8c25baec8..122ca8eda6 100644 --- a/beacon_node/genesis/src/interop.rs +++ b/beacon_node/genesis/src/interop.rs @@ -10,6 +10,20 @@ use types::{ pub const DEFAULT_ETH1_BLOCK_HASH: &[u8] = &[0x42; 32]; +pub fn bls_withdrawal_credentials(pubkey: &PublicKey, spec: &ChainSpec) -> Hash256 { + let mut credentials = hash(&pubkey.as_ssz_bytes()); + credentials[0] = spec.bls_withdrawal_prefix_byte; + Hash256::from_slice(&credentials) +} + +fn eth1_withdrawal_credentials(pubkey: &PublicKey, spec: &ChainSpec) -> Hash256 { + let fake_execution_address = &hash(&pubkey.as_ssz_bytes())[0..20]; + let mut credentials = [0u8; 32]; + credentials[0] = spec.eth1_address_withdrawal_prefix_byte; + credentials[12..].copy_from_slice(fake_execution_address); + Hash256::from_slice(&credentials) +} + /// Builds a genesis state as defined by the Eth2 interop procedure (see below). /// /// Reference: @@ -21,20 +35,75 @@ pub fn interop_genesis_state( execution_payload_header: Option>, spec: &ChainSpec, ) -> Result, String> { + let withdrawal_credentials = keypairs + .iter() + .map(|keypair| bls_withdrawal_credentials(&keypair.pk, spec)) + .collect::>(); + interop_genesis_state_with_withdrawal_credentials::( + keypairs, + &withdrawal_credentials, + genesis_time, + eth1_block_hash, + execution_payload_header, + spec, + ) +} + +// returns an interop genesis state except every other +// validator has eth1 withdrawal credentials +pub fn interop_genesis_state_with_eth1( + keypairs: &[Keypair], + genesis_time: u64, + eth1_block_hash: Hash256, + execution_payload_header: Option>, + spec: &ChainSpec, +) -> Result, String> { + let withdrawal_credentials = keypairs + .iter() + .enumerate() + .map(|(index, keypair)| { + if index % 2 == 0 { + bls_withdrawal_credentials(&keypair.pk, spec) + } else { + eth1_withdrawal_credentials(&keypair.pk, spec) + } + }) + .collect::>(); + interop_genesis_state_with_withdrawal_credentials::( + keypairs, + &withdrawal_credentials, + genesis_time, + eth1_block_hash, + execution_payload_header, + spec, + ) +} + +pub fn interop_genesis_state_with_withdrawal_credentials( + keypairs: &[Keypair], + withdrawal_credentials: &[Hash256], + genesis_time: u64, + eth1_block_hash: Hash256, + execution_payload_header: Option>, + spec: &ChainSpec, +) -> Result, String> { + if keypairs.len() != withdrawal_credentials.len() { + return Err(format!( + "wrong number of withdrawal credentials, expected: {}, got: {}", + keypairs.len(), + withdrawal_credentials.len() + )); + } + let eth1_timestamp = 2_u64.pow(40); let amount = spec.max_effective_balance; - let withdrawal_credentials = |pubkey: &PublicKey| { - let mut credentials = hash(&pubkey.as_ssz_bytes()); - credentials[0] = spec.bls_withdrawal_prefix_byte; - Hash256::from_slice(&credentials) - }; - let datas = keypairs .into_par_iter() - .map(|keypair| { + .zip(withdrawal_credentials.into_par_iter()) + .map(|(keypair, &withdrawal_credentials)| { let mut data = DepositData { - withdrawal_credentials: withdrawal_credentials(&keypair.pk), + withdrawal_credentials, pubkey: keypair.pk.clone().into(), amount, signature: Signature::empty().into(), @@ -133,4 +202,83 @@ mod test { "validator count should be correct" ); } + + #[test] + fn interop_state_with_eth1() { + let validator_count = 16; + let genesis_time = 42; + let spec = &TestEthSpec::default_spec(); + + let keypairs = generate_deterministic_keypairs(validator_count); + + let state = interop_genesis_state_with_eth1::( + &keypairs, + genesis_time, + Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH), + None, + spec, + ) + .expect("should build state"); + + assert_eq!( + state.eth1_data().block_hash, + Hash256::from_slice(&[0x42; 32]), + "eth1 block hash should be co-ordinated junk" + ); + + assert_eq!( + state.genesis_time(), + genesis_time, + "genesis time should be as specified" + ); + + for b in state.balances() { + assert_eq!( + *b, spec.max_effective_balance, + "validator balances should be max effective balance" + ); + } + + for (index, v) in state.validators().iter().enumerate() { + let creds = v.withdrawal_credentials.as_bytes(); + if index % 2 == 0 { + assert_eq!( + creds[0], spec.bls_withdrawal_prefix_byte, + "first byte of withdrawal creds should be bls prefix" + ); + assert_eq!( + &creds[1..], + &hash(&v.pubkey.as_ssz_bytes())[1..], + "rest of withdrawal creds should be pubkey hash" + ); + } else { + assert_eq!( + creds[0], spec.eth1_address_withdrawal_prefix_byte, + "first byte of withdrawal creds should be eth1 prefix" + ); + assert_eq!( + creds[1..12], + [0u8; 11], + "bytes [1:12] of withdrawal creds must be zero" + ); + assert_eq!( + &creds[12..], + &hash(&v.pubkey.as_ssz_bytes())[0..20], + "rest of withdrawal creds should be first 20 bytes of pubkey hash" + ) + } + } + + assert_eq!( + state.balances().len(), + validator_count, + "validator balances len should be correct" + ); + + assert_eq!( + state.validators().len(), + validator_count, + "validator count should be correct" + ); + } } diff --git a/beacon_node/genesis/src/lib.rs b/beacon_node/genesis/src/lib.rs index 1233d99fd3..3fb053bf88 100644 --- a/beacon_node/genesis/src/lib.rs +++ b/beacon_node/genesis/src/lib.rs @@ -5,5 +5,8 @@ mod interop; pub use eth1::Config as Eth1Config; pub use eth1::Eth1Endpoint; pub use eth1_genesis_service::{Eth1GenesisService, Statistics}; -pub use interop::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; +pub use interop::{ + bls_withdrawal_credentials, interop_genesis_state, interop_genesis_state_with_eth1, + interop_genesis_state_with_withdrawal_credentials, DEFAULT_ETH1_BLOCK_HASH, +}; pub use types::test_utils::generate_deterministic_keypairs; diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 077e3aa7cd..0dc918f425 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -45,6 +45,7 @@ logging = { path = "../../common/logging" } serde_json = "1.0.58" proto_array = { path = "../../consensus/proto_array" } unused_port = {path = "../../common/unused_port"} +genesis = { path = "../genesis" } [[test]] name = "bn_http_api_tests" diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 400f7a4f68..45d7b10023 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -16,6 +16,7 @@ mod metrics; mod proposer_duties; mod publish_blocks; mod state_id; +mod sync_committee_rewards; mod sync_committees; mod ui; mod validator_inclusion; @@ -1671,7 +1672,7 @@ pub fn serve( .and_then( |chain: Arc>, address_changes: Vec, - #[allow(unused)] network_tx: UnboundedSender>, + network_tx: UnboundedSender>, log: Logger| { blocking_json_task(move || { let mut failures = vec![]; @@ -1679,15 +1680,38 @@ pub fn serve( for (index, address_change) in address_changes.into_iter().enumerate() { let validator_index = address_change.message.validator_index; - match chain.verify_bls_to_execution_change_for_gossip(address_change) { + match chain.verify_bls_to_execution_change_for_http_api(address_change) { Ok(ObservationOutcome::New(verified_address_change)) => { - publish_pubsub_message( - &network_tx, - PubsubMessage::BlsToExecutionChange(Box::new( - verified_address_change.as_inner().clone(), - )), - )?; - chain.import_bls_to_execution_change(verified_address_change); + let validator_index = + verified_address_change.as_inner().message.validator_index; + let address = verified_address_change + .as_inner() + .message + .to_execution_address; + + // New to P2P *and* op pool, gossip immediately if post-Capella. + let publish = chain.current_slot_is_post_capella().unwrap_or(false); + if publish { + publish_pubsub_message( + &network_tx, + PubsubMessage::BlsToExecutionChange(Box::new( + verified_address_change.as_inner().clone(), + )), + )?; + } + + // Import to op pool (may return `false` if there's a race). + let imported = + chain.import_bls_to_execution_change(verified_address_change); + + info!( + log, + "Processed BLS to execution change"; + "validator_index" => validator_index, + "address" => ?address, + "published" => publish, + "imported" => imported, + ); } Ok(ObservationOutcome::AlreadyKnown) => { debug!( @@ -1697,11 +1721,12 @@ pub fn serve( ); } Err(e) => { - error!( + warn!( log, "Invalid BLS to execution change"; "validator_index" => validator_index, - "source" => "HTTP API", + "reason" => ?e, + "source" => "HTTP", ); failures.push(api_types::Failure::new( index, @@ -1770,6 +1795,41 @@ pub fn serve( }, ); + /* + * beacon/rewards + */ + + let beacon_rewards_path = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("rewards")) + .and(chain_filter.clone()); + + // POST beacon/rewards/sync_committee/{block_id} + let post_beacon_rewards_sync_committee = beacon_rewards_path + .clone() + .and(warp::path("sync_committee")) + .and(block_id_or_err) + .and(warp::path::end()) + .and(warp::body::json()) + .and(log_filter.clone()) + .and_then( + |chain: Arc>, + block_id: BlockId, + validators: Vec, + log: Logger| { + blocking_json_task(move || { + let (rewards, execution_optimistic) = + sync_committee_rewards::compute_sync_committee_rewards( + chain, block_id, validators, log, + )?; + + Ok(rewards) + .map(api_types::GenericResponse::from) + .map(|resp| resp.add_execution_optimistic(execution_optimistic)) + }) + }, + ); + /* * config */ @@ -3491,7 +3551,8 @@ pub fn serve( .or(get_lighthouse_block_packing_efficiency.boxed()) .or(get_lighthouse_merge_readiness.boxed()) .or(get_lighthouse_blobs_sidecars.boxed()) - .or(get_events.boxed()), + .or(get_events.boxed()) + .recover(warp_utils::reject::handle_rejection), ) .boxed() .or(warp::post().and( @@ -3503,6 +3564,7 @@ pub fn serve( .or(post_beacon_pool_proposer_slashings.boxed()) .or(post_beacon_pool_voluntary_exits.boxed()) .or(post_beacon_pool_sync_committees.boxed()) + .or(post_beacon_rewards_sync_committee.boxed()) .or(post_beacon_pool_bls_to_execution_changes.boxed()) .or(post_validator_duties_attester.boxed()) .or(post_validator_duties_sync.boxed()) @@ -3516,7 +3578,8 @@ pub fn serve( .or(post_lighthouse_database_reconstruct.boxed()) .or(post_lighthouse_database_historical_blocks.boxed()) .or(post_lighthouse_block_rewards.boxed()) - .or(post_lighthouse_ui_validator_metrics.boxed()), + .or(post_lighthouse_ui_validator_metrics.boxed()) + .recover(warp_utils::reject::handle_rejection), )) .recover(warp_utils::reject::handle_rejection) .with(slog_logging(log.clone())) diff --git a/beacon_node/http_api/src/sync_committee_rewards.rs b/beacon_node/http_api/src/sync_committee_rewards.rs new file mode 100644 index 0000000000..ae369115d5 --- /dev/null +++ b/beacon_node/http_api/src/sync_committee_rewards.rs @@ -0,0 +1,77 @@ +use crate::{BlockId, ExecutionOptimistic}; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use eth2::lighthouse::SyncCommitteeReward; +use eth2::types::ValidatorId; +use slog::{debug, Logger}; +use state_processing::BlockReplayer; +use std::sync::Arc; +use types::{BeaconState, SignedBlindedBeaconBlock}; +use warp_utils::reject::{beacon_chain_error, custom_not_found}; + +pub fn compute_sync_committee_rewards( + chain: Arc>, + block_id: BlockId, + validators: Vec, + log: Logger, +) -> Result<(Option>, ExecutionOptimistic), warp::Rejection> { + let (block, execution_optimistic) = block_id.blinded_block(&chain)?; + + let mut state = get_state_before_applying_block(chain.clone(), &block)?; + + let reward_payload = chain + .compute_sync_committee_rewards(block.message(), &mut state) + .map_err(beacon_chain_error)?; + + let data = if reward_payload.is_empty() { + debug!(log, "compute_sync_committee_rewards returned empty"); + None + } else if validators.is_empty() { + Some(reward_payload) + } else { + Some( + reward_payload + .into_iter() + .filter(|reward| { + validators.iter().any(|validator| match validator { + ValidatorId::Index(i) => reward.validator_index == *i, + ValidatorId::PublicKey(pubkey) => match state.get_validator_index(pubkey) { + Ok(Some(i)) => reward.validator_index == i as u64, + _ => false, + }, + }) + }) + .collect::>(), + ) + }; + + Ok((data, execution_optimistic)) +} + +fn get_state_before_applying_block( + chain: Arc>, + block: &SignedBlindedBeaconBlock, +) -> Result, warp::reject::Rejection> { + let parent_block: SignedBlindedBeaconBlock = chain + .get_blinded_block(&block.parent_root()) + .and_then(|maybe_block| { + maybe_block.ok_or_else(|| BeaconChainError::MissingBeaconBlock(block.parent_root())) + }) + .map_err(|e| custom_not_found(format!("Parent block is not available! {:?}", e)))?; + + let parent_state = chain + .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .and_then(|maybe_state| { + maybe_state + .ok_or_else(|| BeaconChainError::MissingBeaconState(parent_block.state_root())) + }) + .map_err(|e| custom_not_found(format!("Parent state is not available! {:?}", e)))?; + + let replayer = BlockReplayer::new(parent_state, &chain.spec) + .no_signature_verification() + .state_root_iter([Ok((parent_block.state_root(), parent_block.slot()))].into_iter()) + .minimal_block_root_verification() + .apply_blocks(vec![], Some(block.slot())) + .map_err(beacon_chain_error)?; + + Ok(replayer.into_state()) +} diff --git a/beacon_node/http_api/tests/common.rs b/beacon_node/http_api/tests/common.rs index 7c228d9803..ee02735797 100644 --- a/beacon_node/http_api/tests/common.rs +++ b/beacon_node/http_api/tests/common.rs @@ -1,5 +1,7 @@ use beacon_chain::{ - test_utils::{BeaconChainHarness, BoxedMutator, EphemeralHarnessType}, + test_utils::{ + BeaconChainHarness, BoxedMutator, Builder as HarnessBuilder, EphemeralHarnessType, + }, BeaconChain, BeaconChainTypes, }; use directory::DEFAULT_ROOT_DIR; @@ -55,25 +57,39 @@ pub struct ApiServer> { pub external_peer_id: PeerId, } +type Initializer = Box< + dyn FnOnce(HarnessBuilder>) -> HarnessBuilder>, +>; type Mutator = BoxedMutator, MemoryStore>; impl InteractiveTester { pub async fn new(spec: Option, validator_count: usize) -> Self { - Self::new_with_mutator(spec, validator_count, None).await + Self::new_with_initializer_and_mutator(spec, validator_count, None, None).await } - pub async fn new_with_mutator( + pub async fn new_with_initializer_and_mutator( spec: Option, validator_count: usize, + initializer: Option>, mutator: Option>, ) -> Self { let mut harness_builder = BeaconChainHarness::builder(E::default()) .spec_or_default(spec) - .deterministic_keypairs(validator_count) .logger(test_logger()) - .mock_execution_layer() - .fresh_ephemeral_store(); + .mock_execution_layer(); + harness_builder = if let Some(initializer) = initializer { + // Apply custom initialization provided by the caller. + initializer(harness_builder) + } else { + // Apply default initial configuration. + harness_builder + .deterministic_keypairs(validator_count) + .fresh_ephemeral_store() + }; + + // Add a mutator for the beacon chain builder which will be called in + // `HarnessBuilder::build`. if let Some(mutator) = mutator { harness_builder = harness_builder.initial_mutator(mutator); } diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 942a1167c2..e61470fe95 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -1,8 +1,15 @@ //! Tests for API behaviour across fork boundaries. use crate::common::*; -use beacon_chain::{test_utils::RelativeSyncCommittee, StateSkipConfig}; -use eth2::types::{StateId, SyncSubcommittee}; -use types::{ChainSpec, Epoch, EthSpec, MinimalEthSpec, Slot}; +use beacon_chain::{ + test_utils::{RelativeSyncCommittee, DEFAULT_ETH1_BLOCK_HASH, HARNESS_GENESIS_TIME}, + StateSkipConfig, +}; +use eth2::types::{IndexedErrorMessage, StateId, SyncSubcommittee}; +use genesis::{bls_withdrawal_credentials, interop_genesis_state_with_withdrawal_credentials}; +use types::{ + test_utils::{generate_deterministic_keypair, generate_deterministic_keypairs}, + Address, ChainSpec, Epoch, EthSpec, Hash256, MinimalEthSpec, Slot, +}; type E = MinimalEthSpec; @@ -12,6 +19,14 @@ fn altair_spec(altair_fork_epoch: Epoch) -> ChainSpec { spec } +fn capella_spec(capella_fork_epoch: Epoch) -> ChainSpec { + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + spec.capella_fork_epoch = Some(capella_fork_epoch); + spec +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn sync_committee_duties_across_fork() { let validator_count = E::sync_committee_size(); @@ -307,3 +322,203 @@ async fn sync_committee_indices_across_fork() { ); } } + +/// Assert that an HTTP API error has the given status code and indexed errors for the given indices. +fn assert_server_indexed_error(error: eth2::Error, status_code: u16, indices: Vec) { + let eth2::Error::ServerIndexedMessage(IndexedErrorMessage { + code, + failures, + .. + }) = error else { + panic!("wrong error, expected ServerIndexedMessage, got: {error:?}") + }; + assert_eq!(code, status_code); + assert_eq!(failures.len(), indices.len()); + for (index, failure) in indices.into_iter().zip(failures) { + assert_eq!(failure.index, index as u64); + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn bls_to_execution_changes_update_all_around_capella_fork() { + let validator_count = 128; + let fork_epoch = Epoch::new(2); + let spec = capella_spec(fork_epoch); + let max_bls_to_execution_changes = E::max_bls_to_execution_changes(); + + // Use a genesis state with entirely BLS withdrawal credentials. + // Offset keypairs by `validator_count` to create keys distinct from the signing keys. + let validator_keypairs = generate_deterministic_keypairs(validator_count); + let withdrawal_keypairs = (0..validator_count) + .map(|i| Some(generate_deterministic_keypair(i + validator_count))) + .collect::>(); + let withdrawal_credentials = withdrawal_keypairs + .iter() + .map(|keypair| bls_withdrawal_credentials(&keypair.as_ref().unwrap().pk, &spec)) + .collect::>(); + let genesis_state = interop_genesis_state_with_withdrawal_credentials( + &validator_keypairs, + &withdrawal_credentials, + HARNESS_GENESIS_TIME, + Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH), + None, + &spec, + ) + .unwrap(); + + let tester = InteractiveTester::::new_with_initializer_and_mutator( + Some(spec.clone()), + validator_count, + Some(Box::new(|harness_builder| { + harness_builder + .keypairs(validator_keypairs) + .withdrawal_keypairs(withdrawal_keypairs) + .genesis_state_ephemeral_store(genesis_state) + })), + None, + ) + .await; + let harness = &tester.harness; + let client = &tester.client; + + let all_validators = harness.get_all_validators(); + let all_validators_u64 = all_validators.iter().map(|x| *x as u64).collect::>(); + + // Create a bunch of valid address changes. + let valid_address_changes = all_validators_u64 + .iter() + .map(|&validator_index| { + harness.make_bls_to_execution_change( + validator_index, + Address::from_low_u64_be(validator_index), + ) + }) + .collect::>(); + + // Address changes which conflict with `valid_address_changes` on the address chosen. + let conflicting_address_changes = all_validators_u64 + .iter() + .map(|&validator_index| { + harness.make_bls_to_execution_change( + validator_index, + Address::from_low_u64_be(validator_index + 1), + ) + }) + .collect::>(); + + // Address changes signed with the wrong key. + let wrong_key_address_changes = all_validators_u64 + .iter() + .map(|&validator_index| { + // Use the correct pubkey. + let pubkey = &harness.get_withdrawal_keypair(validator_index).pk; + // And the wrong secret key. + let secret_key = &harness + .get_withdrawal_keypair((validator_index + 1) % validator_count as u64) + .sk; + harness.make_bls_to_execution_change_with_keys( + validator_index, + Address::from_low_u64_be(validator_index), + pubkey, + secret_key, + ) + }) + .collect::>(); + + // Submit some changes before Capella. Just enough to fill two blocks. + let num_pre_capella = validator_count / 4; + let blocks_filled_pre_capella = 2; + assert_eq!( + num_pre_capella, + blocks_filled_pre_capella * max_bls_to_execution_changes + ); + + client + .post_beacon_pool_bls_to_execution_changes(&valid_address_changes[..num_pre_capella]) + .await + .unwrap(); + + // Conflicting changes for the same validators should all fail. + let error = client + .post_beacon_pool_bls_to_execution_changes(&conflicting_address_changes[..num_pre_capella]) + .await + .unwrap_err(); + assert_server_indexed_error(error, 400, (0..num_pre_capella).collect()); + + // Re-submitting the same changes should be accepted. + client + .post_beacon_pool_bls_to_execution_changes(&valid_address_changes[..num_pre_capella]) + .await + .unwrap(); + + // Invalid changes signed with the wrong keys should all be rejected without affecting the seen + // indices filters (apply ALL of them). + let error = client + .post_beacon_pool_bls_to_execution_changes(&wrong_key_address_changes) + .await + .unwrap_err(); + assert_server_indexed_error(error, 400, all_validators.clone()); + + // Advance to right before Capella. + let capella_slot = fork_epoch.start_slot(E::slots_per_epoch()); + harness.extend_to_slot(capella_slot - 1).await; + assert_eq!(harness.head_slot(), capella_slot - 1); + + // Add Capella blocks which should be full of BLS to execution changes. + for i in 0..validator_count / max_bls_to_execution_changes { + let head_block_root = harness.extend_slots(1).await; + let head_block = harness + .chain + .get_block(&head_block_root) + .await + .unwrap() + .unwrap(); + + let bls_to_execution_changes = head_block + .message() + .body() + .bls_to_execution_changes() + .unwrap(); + + // Block should be full. + assert_eq!( + bls_to_execution_changes.len(), + max_bls_to_execution_changes, + "block not full on iteration {i}" + ); + + // Included changes should be the ones from `valid_address_changes` in any order. + for address_change in bls_to_execution_changes.iter() { + assert!(valid_address_changes.contains(address_change)); + } + + // After the initial 2 blocks, add the rest of the changes using a large + // request containing all the valid, all the conflicting and all the invalid. + // Despite the invalid and duplicate messages, the new ones should still get picked up by + // the pool. + if i == blocks_filled_pre_capella - 1 { + let all_address_changes: Vec<_> = [ + valid_address_changes.clone(), + conflicting_address_changes.clone(), + wrong_key_address_changes.clone(), + ] + .concat(); + + let error = client + .post_beacon_pool_bls_to_execution_changes(&all_address_changes) + .await + .unwrap_err(); + assert_server_indexed_error( + error, + 400, + (validator_count..3 * validator_count).collect(), + ); + } + } + + // Eventually all validators should have eth1 withdrawal credentials. + let head_state = harness.get_current_state(); + for validator in head_state.validators() { + assert!(validator.has_eth1_withdrawal_credential(&spec)); + } +} diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 04d527d531..7096fac425 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -278,9 +278,10 @@ pub async fn proposer_boost_re_org_test( let num_empty_votes = Some(attesters_per_slot * percent_empty_votes / 100); let num_head_votes = Some(attesters_per_slot * percent_head_votes / 100); - let tester = InteractiveTester::::new_with_mutator( + let tester = InteractiveTester::::new_with_initializer_and_mutator( Some(spec), validator_count, + None, Some(Box::new(move |builder| { builder .proposer_re_org_threshold(Some(ReOrgThreshold(re_org_threshold))) @@ -544,7 +545,7 @@ pub async fn proposer_boost_re_org_test( pub async fn fork_choice_before_proposal() { // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing // `validator_count // 32`. - let validator_count = 32; + let validator_count = 64; let all_validators = (0..validator_count).collect::>(); let num_initial: u64 = 31; diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 3f3b6986da..e92f03fb91 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -70,7 +70,8 @@ use types::{ SyncCommitteeMessage, SyncSubnetId, }; use work_reprocessing_queue::{ - spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, + spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, + QueuedUnaggregate, ReadyWork, }; use worker::{Toolbox, Worker}; @@ -144,6 +145,10 @@ const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024; /// before we start dropping them. const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored +/// for reprocessing before we start dropping them. +const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128; + /// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping /// them. const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048; @@ -233,6 +238,7 @@ pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request"; pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; +pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; /// A simple first-in-first-out queue with a maximum length. @@ -781,6 +787,21 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, + ReadyWork::LightClientUpdate(QueuedLightClientUpdate { + peer_id, + message_id, + light_client_optimistic_update, + seen_timestamp, + .. + }) => Self { + drop_during_sync: true, + work: Work::UnknownLightClientOptimisticUpdate { + message_id, + peer_id, + light_client_optimistic_update, + seen_timestamp, + }, + }, } } } @@ -820,6 +841,12 @@ pub enum Work { aggregate: Box>, seen_timestamp: Duration, }, + UnknownLightClientOptimisticUpdate { + message_id: MessageId, + peer_id: PeerId, + light_client_optimistic_update: Box>, + seen_timestamp: Duration, + }, GossipAggregateBatch { packages: Vec>, }, @@ -957,6 +984,7 @@ impl Work { Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, + Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE, } } @@ -1092,6 +1120,8 @@ impl BeaconProcessor { // Using a FIFO queue for light client updates to maintain sequence order. let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN); let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN); + let mut unknown_light_client_update_queue = + FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN); // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); @@ -1483,6 +1513,9 @@ impl BeaconProcessor { Work::UnknownBlockAggregate { .. } => { unknown_block_aggregate_queue.push(work) } + Work::UnknownLightClientOptimisticUpdate { .. } => { + unknown_light_client_update_queue.push(work, work_id, &self.log) + } Work::GossipBlsToExecutionChange { .. } => { gossip_bls_to_execution_change_queue.push(work, work_id, &self.log) } @@ -1848,6 +1881,7 @@ impl BeaconProcessor { message_id, peer_id, *light_client_optimistic_update, + Some(work_reprocessing_tx), seen_timestamp, ) }), @@ -1998,6 +2032,20 @@ impl BeaconProcessor { seen_timestamp, ) }), + Work::UnknownLightClientOptimisticUpdate { + message_id, + peer_id, + light_client_optimistic_update, + seen_timestamp, + } => task_spawner.spawn_blocking(move || { + worker.process_gossip_optimistic_update( + message_id, + peer_id, + *light_client_optimistic_update, + None, + seen_timestamp, + ) + }), }; } } diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index bf9dfd9049..d7cede9ee7 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -20,7 +20,7 @@ use futures::task::Poll; use futures::{Stream, StreamExt}; use lighthouse_network::{MessageId, PeerId}; use logging::TimeLatch; -use slog::{crit, debug, error, warn, Logger}; +use slog::{crit, debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::pin::Pin; @@ -30,12 +30,16 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; -use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId}; +use types::{ + Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, + SubnetId, +}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; +const LIGHT_CLIENT_UPDATES: &str = "lc_updates"; /// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts. /// This is to account for any slight drift in the system clock. @@ -44,6 +48,9 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5); /// For how long to queue aggregated and unaggregated attestations for re-processing. pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); +/// For how long to queue light client updates for re-processing. +pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); + /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); @@ -55,6 +62,9 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16; /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; +/// How many light client updates we keep before new ones get dropped. +const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128; + /// Messages that the scheduler can receive. pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. @@ -62,13 +72,18 @@ pub enum ReprocessQueueMessage { /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// hash until the gossip block is imported. RpcBlock(QueuedRpcBlock), - /// A block that was successfully processed. We use this to handle attestations for unknown - /// blocks. - BlockImported(Hash256), + /// A block that was successfully processed. We use this to handle attestations and light client updates + /// for unknown blocks. + BlockImported { + block_root: Hash256, + parent_root: Hash256, + }, /// An unaggregated attestation that references an unknown block. UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. UnknownBlockAggregate(QueuedAggregate), + /// A light client optimistic update that references a parent root that has not been seen as a parent. + UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), } /// Events sent by the scheduler once they are ready for re-processing. @@ -77,6 +92,7 @@ pub enum ReadyWork { RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), + LightClientUpdate(QueuedLightClientUpdate), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -99,6 +115,16 @@ pub struct QueuedAggregate { pub seen_timestamp: Duration, } +/// A light client update for which the corresponding parent block was not seen while processing, +/// queued for later. +pub struct QueuedLightClientUpdate { + pub peer_id: PeerId, + pub message_id: MessageId, + pub light_client_optimistic_update: Box>, + pub parent_root: Hash256, + pub seen_timestamp: Duration, +} + /// A block that arrived early and has been queued for later import. pub struct QueuedGossipBlock { pub peer_id: PeerId, @@ -127,6 +153,8 @@ enum InboundEvent { ReadyRpcBlock(QueuedRpcBlock), /// An aggregated or unaggregated attestation is ready for re-processing. ReadyAttestation(QueuedAttestationId), + /// A light client update that is ready for re-processing. + ReadyLightClientUpdate(QueuedLightClientUpdateId), /// A `DelayQueue` returned an error. DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` @@ -147,6 +175,8 @@ struct ReprocessQueue { rpc_block_delay_queue: DelayQueue>, /// Queue to manage scheduled attestations. attestations_delay_queue: DelayQueue, + /// Queue to manage scheduled light client updates. + lc_updates_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -157,15 +187,23 @@ struct ReprocessQueue { queued_unaggregates: FnvHashMap, DelayKey)>, /// Attestations (aggregated and unaggregated) per root. awaiting_attestations_per_root: HashMap>, + /// Queued Light Client Updates. + queued_lc_updates: FnvHashMap, DelayKey)>, + /// Light Client Updates per parent_root. + awaiting_lc_updates_per_parent_root: HashMap>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, + next_lc_update: usize, early_block_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, + lc_update_delay_debounce: TimeLatch, } +pub type QueuedLightClientUpdateId = usize; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum QueuedAttestationId { Aggregate(usize), @@ -235,6 +273,20 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.lc_updates_delay_queue.poll_expired(cx) { + Poll::Ready(Some(Ok(lc_id))) => { + return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate( + lc_id.into_inner(), + ))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue"))); + } + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + // Last empty the messages channel. match self.work_reprocessing_rx.poll_recv(cx) { Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))), @@ -264,14 +316,19 @@ pub fn spawn_reprocess_scheduler( gossip_block_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), + lc_updates_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), + queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), + awaiting_lc_updates_per_parent_root: HashMap::new(), next_attestation: 0, + next_lc_update: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), + lc_update_delay_debounce: TimeLatch::default(), }; executor.spawn( @@ -473,9 +530,49 @@ impl ReprocessQueue { self.next_attestation += 1; } - InboundEvent::Msg(BlockImported(root)) => { + InboundEvent::Msg(UnknownLightClientOptimisticUpdate( + queued_light_client_optimistic_update, + )) => { + if self.lc_updates_delay_queue.len() >= MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES { + if self.lc_update_delay_debounce.elapsed() { + error!( + log, + "Light client updates delay queue is full"; + "queue_size" => MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES, + "msg" => "check system clock" + ); + } + // Drop the light client update. + return; + } + + let lc_id: QueuedLightClientUpdateId = self.next_lc_update; + + // Register the delay. + let delay_key = self + .lc_updates_delay_queue + .insert(lc_id, QUEUED_LIGHT_CLIENT_UPDATE_DELAY); + + // Register the light client update for the corresponding root. + self.awaiting_lc_updates_per_parent_root + .entry(queued_light_client_optimistic_update.parent_root) + .or_default() + .push(lc_id); + + // Store the light client update and its info. + self.queued_lc_updates.insert( + self.next_lc_update, + (queued_light_client_optimistic_update, delay_key), + ); + + self.next_lc_update += 1; + } + InboundEvent::Msg(BlockImported { + block_root, + parent_root, + }) => { // Unqueue the attestations we have for this root, if any. - if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) { + if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { for id in queued_ids { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS, @@ -511,12 +608,62 @@ impl ReprocessQueue { error!( log, "Unknown queued attestation for block root"; - "block_root" => ?root, + "block_root" => ?block_root, "att_id" => ?id, ); } } } + // Unqueue the light client optimistic updates we have for this root, if any. + if let Some(queued_lc_id) = self + .awaiting_lc_updates_per_parent_root + .remove(&parent_root) + { + debug!( + log, + "Dequeuing light client optimistic updates"; + "parent_root" => %parent_root, + "count" => queued_lc_id.len(), + ); + + for lc_id in queued_lc_id { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES, + ); + if let Some((work, delay_key)) = self.queued_lc_updates.remove(&lc_id).map( + |(light_client_optimistic_update, delay_key)| { + ( + ReadyWork::LightClientUpdate(light_client_optimistic_update), + delay_key, + ) + }, + ) { + // Remove the delay + self.lc_updates_delay_queue.remove(&delay_key); + + // Send the work + match self.ready_work_tx.try_send(work) { + Ok(_) => trace!( + log, + "reprocessing light client update sent"; + ), + Err(_) => error!( + log, + "Failed to send scheduled light client update"; + ), + } + } else { + // There is a mismatch between the light client update ids registered for this + // root and the queued light client updates. This should never happen. + error!( + log, + "Unknown queued light client update for parent root"; + "parent_root" => ?parent_root, + "lc_id" => ?lc_id, + ); + } + } + } } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { @@ -591,6 +738,38 @@ impl ReprocessQueue { } } } + InboundEvent::ReadyLightClientUpdate(queued_id) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES, + ); + + if let Some((parent_root, work)) = self.queued_lc_updates.remove(&queued_id).map( + |(queued_lc_update, _delay_key)| { + ( + queued_lc_update.parent_root, + ReadyWork::LightClientUpdate(queued_lc_update), + ) + }, + ) { + if self.ready_work_tx.try_send(work).is_err() { + error!( + log, + "Failed to send scheduled light client optimistic update"; + ); + } + + if let Some(queued_lc_updates) = self + .awaiting_lc_updates_per_parent_root + .get_mut(&parent_root) + { + if let Some(index) = + queued_lc_updates.iter().position(|&id| id == queued_id) + { + queued_lc_updates.swap_remove(index); + } + } + } + } } metrics::set_gauge_vec( @@ -608,5 +787,10 @@ impl ReprocessQueue { &[ATTESTATIONS], self.attestations_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[LIGHT_CLIENT_UPDATES], + self.lc_updates_delay_queue.len() as i64, + ); } } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 19d82c449a..7970a32b45 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -28,7 +28,8 @@ use types::{ use super::{ super::work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedUnaggregate, ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, + ReprocessQueueMessage, }, Worker, }; @@ -716,6 +717,10 @@ impl Worker { &metrics::BEACON_BLOCK_GOSSIP_SLOT_START_DELAY_TIME, block_delay, ); + metrics::set_gauge( + &metrics::BEACON_BLOCK_LAST_DELAY, + block_delay.as_millis() as i64, + ); let verification_result = self .chain @@ -961,7 +966,10 @@ impl Worker { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); if reprocess_tx - .try_send(ReprocessQueueMessage::BlockImported(block_root)) + .try_send(ReprocessQueueMessage::BlockImported { + block_root, + parent_root: block.message().parent_root(), + }) .is_err() { error!( @@ -1227,7 +1235,7 @@ impl Worker { "error" => ?e ); // We ignore pre-capella messages without penalizing peers. - if matches!(e, BeaconChainError::BlsToExecutionChangeBadFork(_)) { + if matches!(e, BeaconChainError::BlsToExecutionPriorToCapella) { self.propagate_validation_result( message_id, peer_id, @@ -1410,7 +1418,7 @@ impl Worker { LightClientFinalityUpdateError::InvalidLightClientFinalityUpdate => { debug!( self.log, - "LC invalid finality update"; + "Light client invalid finality update"; "peer" => %peer_id, "error" => ?e, ); @@ -1424,7 +1432,7 @@ impl Worker { LightClientFinalityUpdateError::TooEarly => { debug!( self.log, - "LC finality update too early"; + "Light client finality update too early"; "peer" => %peer_id, "error" => ?e, ); @@ -1437,7 +1445,7 @@ impl Worker { } LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!( self.log, - "LC finality update already seen"; + "Light client finality update already seen"; "peer" => %peer_id, "error" => ?e, ), @@ -1446,7 +1454,7 @@ impl Worker { | LightClientFinalityUpdateError::SigSlotStartIsNone | LightClientFinalityUpdateError::FailedConstructingUpdate => debug!( self.log, - "LC error constructing finality update"; + "Light client error constructing finality update"; "peer" => %peer_id, "error" => ?e, ), @@ -1461,22 +1469,77 @@ impl Worker { message_id: MessageId, peer_id: PeerId, light_client_optimistic_update: LightClientOptimisticUpdate, + reprocess_tx: Option>>, seen_timestamp: Duration, ) { - match self - .chain - .verify_optimistic_update_for_gossip(light_client_optimistic_update, seen_timestamp) - { - Ok(_verified_light_client_optimistic_update) => { + match self.chain.verify_optimistic_update_for_gossip( + light_client_optimistic_update.clone(), + seen_timestamp, + ) { + Ok(verified_light_client_optimistic_update) => { + debug!( + self.log, + "Light client successful optimistic update"; + "peer" => %peer_id, + "parent_root" => %verified_light_client_optimistic_update.parent_root, + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); } Err(e) => { - metrics::register_optimistic_update_error(&e); match e { - LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => { + LightClientOptimisticUpdateError::UnknownBlockParentRoot(parent_root) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES, + ); debug!( self.log, - "LC invalid optimistic update"; + "Optimistic update for unknown block"; + "peer_id" => %peer_id, + "parent_root" => ?parent_root + ); + + if let Some(sender) = reprocess_tx { + let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate( + QueuedLightClientUpdate { + peer_id, + message_id, + light_client_optimistic_update: Box::new( + light_client_optimistic_update, + ), + parent_root, + seen_timestamp, + }, + ); + + if sender.try_send(msg).is_err() { + error!( + self.log, + "Failed to send optimistic update for re-processing"; + ) + } + } else { + debug!( + self.log, + "Not sending light client update because it had been reprocessed"; + "peer_id" => %peer_id, + "parent_root" => ?parent_root + ); + + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + return; + } + LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => { + metrics::register_optimistic_update_error(&e); + + debug!( + self.log, + "Light client invalid optimistic update"; "peer" => %peer_id, "error" => ?e, ); @@ -1488,9 +1551,10 @@ impl Worker { ) } LightClientOptimisticUpdateError::TooEarly => { + metrics::register_optimistic_update_error(&e); debug!( self.log, - "LC optimistic update too early"; + "Light client optimistic update too early"; "peer" => %peer_id, "error" => ?e, ); @@ -1501,21 +1565,29 @@ impl Worker { "light_client_gossip_error", ); } - LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => debug!( - self.log, - "LC optimistic update already seen"; - "peer" => %peer_id, - "error" => ?e, - ), + LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => { + metrics::register_optimistic_update_error(&e); + + debug!( + self.log, + "Light client optimistic update already seen"; + "peer" => %peer_id, + "error" => ?e, + ) + } LightClientOptimisticUpdateError::BeaconChainError(_) | LightClientOptimisticUpdateError::LightClientUpdateError(_) | LightClientOptimisticUpdateError::SigSlotStartIsNone - | LightClientOptimisticUpdateError::FailedConstructingUpdate => debug!( - self.log, - "LC error constructing optimistic update"; - "peer" => %peer_id, - "error" => ?e, - ), + | LightClientOptimisticUpdateError::FailedConstructingUpdate => { + metrics::register_optimistic_update_error(&e); + + debug!( + self.log, + "Light client error constructing optimistic update"; + "peer" => %peer_id, + "error" => ?e, + ) + } } self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 88fcef6b99..8d5bd53aea 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -85,6 +85,7 @@ impl Worker { } }; let slot = block.slot(); + let parent_root = block.message().parent_root(); let available_block = block .into_available_block(block_root, &self.chain) .map_err(BlockError::BlobValidation); @@ -110,7 +111,10 @@ impl Worker { info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); // Trigger processing for work referencing this block. - let reprocess_msg = ReprocessQueueMessage::BlockImported(hash); + let reprocess_msg = ReprocessQueueMessage::BlockImported { + block_root: hash, + parent_root, + }; if reprocess_tx.try_send(reprocess_msg).is_err() { error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash) }; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 7544a71d78..ee029e1351 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -357,10 +357,18 @@ lazy_static! { pub static ref BEACON_BLOCK_GOSSIP_SLOT_START_DELAY_TIME: Result = try_create_histogram_with_buckets( "beacon_block_gossip_slot_start_delay_time", "Duration between when the block is received and the start of the slot it belongs to.", + // Create a custom bucket list for greater granularity in block delay + Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) + // NOTE: Previous values, which we may want to switch back to. // [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50] - decimal_buckets(-1,2) + //decimal_buckets(-1,2) ); + pub static ref BEACON_BLOCK_LAST_DELAY: Result = try_create_int_gauge( + "beacon_block_last_delay", + "Keeps track of the last block's delay from the start of the slot" + ); + pub static ref BEACON_BLOCK_GOSSIP_ARRIVED_LATE_TOTAL: Result = try_create_int_counter( "beacon_block_gossip_arrived_late_total", "Count of times when a gossip block arrived from the network later than the attestation deadline.", @@ -384,6 +392,21 @@ lazy_static! { "Number of queued attestations where as matching block has been imported." ); + /* + * Light client update reprocessing queue metrics. + */ + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_expired_optimistic_updates", + "Number of queued light client optimistic updates which have expired before a matching block has been found." + ); + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_matched_optimistic_updates", + "Number of queued light client optimistic updates where as matching block has been imported." + ); + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_sent_optimistic_updates", + "Number of queued light client optimistic updates where as matching block has been imported." + ); } pub fn update_bandwidth_metrics(bandwidth: Arc) { diff --git a/beacon_node/operation_pool/src/bls_to_execution_changes.rs b/beacon_node/operation_pool/src/bls_to_execution_changes.rs new file mode 100644 index 0000000000..84513d466e --- /dev/null +++ b/beacon_node/operation_pool/src/bls_to_execution_changes.rs @@ -0,0 +1,105 @@ +use state_processing::SigVerifiedOp; +use std::collections::{hash_map::Entry, HashMap}; +use std::sync::Arc; +use types::{ + AbstractExecPayload, BeaconState, ChainSpec, EthSpec, SignedBeaconBlock, + SignedBlsToExecutionChange, +}; + +/// Pool of BLS to execution changes that maintains a LIFO queue and an index by validator. +/// +/// Using the LIFO queue for block production disincentivises spam on P2P at the Capella fork, +/// and is less-relevant after that. +#[derive(Debug, Default)] +pub struct BlsToExecutionChanges { + /// Map from validator index to BLS to execution change. + by_validator_index: HashMap>>, + /// Last-in-first-out (LIFO) queue of verified messages. + queue: Vec>>, +} + +impl BlsToExecutionChanges { + pub fn existing_change_equals( + &self, + address_change: &SignedBlsToExecutionChange, + ) -> Option { + self.by_validator_index + .get(&address_change.message.validator_index) + .map(|existing| existing.as_inner() == address_change) + } + + pub fn insert( + &mut self, + verified_change: SigVerifiedOp, + ) -> bool { + // Wrap in an `Arc` once on insert. + let verified_change = Arc::new(verified_change); + match self + .by_validator_index + .entry(verified_change.as_inner().message.validator_index) + { + Entry::Vacant(entry) => { + self.queue.push(verified_change.clone()); + entry.insert(verified_change); + true + } + Entry::Occupied(_) => false, + } + } + + /// FIFO ordering, used for persistence to disk. + pub fn iter_fifo( + &self, + ) -> impl Iterator>> { + self.queue.iter() + } + + /// LIFO ordering, used for block packing. + pub fn iter_lifo( + &self, + ) -> impl Iterator>> { + self.queue.iter().rev() + } + + /// Prune BLS to execution changes that have been applied to the state more than 1 block ago. + /// + /// The block check is necessary to avoid pruning too eagerly and losing the ability to include + /// address changes during re-orgs. This is isn't *perfect* so some address changes could + /// still get stuck if there are gnarly re-orgs and the changes can't be widely republished + /// due to the gossip duplicate rules. + pub fn prune>( + &mut self, + head_block: &SignedBeaconBlock, + head_state: &BeaconState, + spec: &ChainSpec, + ) { + let mut validator_indices_pruned = vec![]; + + self.queue.retain(|address_change| { + let validator_index = address_change.as_inner().message.validator_index; + head_state + .validators() + .get(validator_index as usize) + .map_or(true, |validator| { + let prune = validator.has_eth1_withdrawal_credential(spec) + && head_block + .message() + .body() + .bls_to_execution_changes() + .map_or(true, |recent_changes| { + !recent_changes + .iter() + .any(|c| c.message.validator_index == validator_index) + }); + if prune { + validator_indices_pruned.push(validator_index); + } + !prune + }) + }); + + for validator_index in validator_indices_pruned { + self.by_validator_index.remove(&validator_index); + } + } +} diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 70e0d56bc9..4643addad5 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -2,6 +2,7 @@ mod attestation; mod attestation_id; mod attestation_storage; mod attester_slashing; +mod bls_to_execution_changes; mod max_cover; mod metrics; mod persistence; @@ -18,6 +19,7 @@ pub use persistence::{ pub use reward_cache::RewardCache; use crate::attestation_storage::{AttestationMap, CheckpointKey}; +use crate::bls_to_execution_changes::BlsToExecutionChanges; use crate::sync_aggregate_id::SyncAggregateId; use attester_slashing::AttesterSlashingMaxCover; use max_cover::maximum_cover; @@ -51,8 +53,8 @@ pub struct OperationPool { proposer_slashings: RwLock>>, /// Map from exiting validator to their exit data. voluntary_exits: RwLock>>, - /// Map from credential changing validator to their execution change data. - bls_to_execution_changes: RwLock>>, + /// Map from credential changing validator to their position in the queue. + bls_to_execution_changes: RwLock>, /// Reward cache for accelerating attestation packing. reward_cache: RwLock, _phantom: PhantomData, @@ -513,15 +515,28 @@ impl OperationPool { ); } - /// Insert a BLS to execution change into the pool. + /// Check if an address change equal to `address_change` is already in the pool. + /// + /// Return `None` if no address change for the validator index exists in the pool. + pub fn bls_to_execution_change_in_pool_equals( + &self, + address_change: &SignedBlsToExecutionChange, + ) -> Option { + self.bls_to_execution_changes + .read() + .existing_change_equals(address_change) + } + + /// Insert a BLS to execution change into the pool, *only if* no prior change is known. + /// + /// Return `true` if the change was inserted. pub fn insert_bls_to_execution_change( &self, verified_change: SigVerifiedOp, - ) { - self.bls_to_execution_changes.write().insert( - verified_change.as_inner().message.validator_index, - verified_change, - ); + ) -> bool { + self.bls_to_execution_changes + .write() + .insert(verified_change) } /// Get a list of execution changes for inclusion in a block. @@ -533,7 +548,7 @@ impl OperationPool { spec: &ChainSpec, ) -> Vec { filter_limit_operations( - self.bls_to_execution_changes.read().values(), + self.bls_to_execution_changes.read().iter_lifo(), |address_change| { address_change.signature_is_still_valid(&state.fork()) && state @@ -548,33 +563,15 @@ impl OperationPool { } /// Prune BLS to execution changes that have been applied to the state more than 1 block ago. - /// - /// The block check is necessary to avoid pruning too eagerly and losing the ability to include - /// address changes during re-orgs. This is isn't *perfect* so some address changes could - /// still get stuck if there are gnarly re-orgs and the changes can't be widely republished - /// due to the gossip duplicate rules. pub fn prune_bls_to_execution_changes>( &self, head_block: &SignedBeaconBlock, head_state: &BeaconState, spec: &ChainSpec, ) { - prune_validator_hash_map( - &mut self.bls_to_execution_changes.write(), - |validator_index, validator| { - validator.has_eth1_withdrawal_credential(spec) - && head_block - .message() - .body() - .bls_to_execution_changes() - .map_or(true, |recent_changes| { - !recent_changes - .iter() - .any(|c| c.message.validator_index == validator_index) - }) - }, - head_state, - ); + self.bls_to_execution_changes + .write() + .prune(head_block, head_state, spec) } /// Prune all types of transactions given the latest head state and head fork. @@ -663,8 +660,8 @@ impl OperationPool { pub fn get_all_bls_to_execution_changes(&self) -> Vec { self.bls_to_execution_changes .read() - .iter() - .map(|(_, address_change)| address_change.as_inner().clone()) + .iter_fifo() + .map(|address_change| address_change.as_inner().clone()) .collect() } } diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 043e6fb7fd..4948040ae1 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -1,5 +1,6 @@ use crate::attestation_id::AttestationId; use crate::attestation_storage::AttestationMap; +use crate::bls_to_execution_changes::BlsToExecutionChanges; use crate::sync_aggregate_id::SyncAggregateId; use crate::OpPoolError; use crate::OperationPool; @@ -105,8 +106,8 @@ impl PersistedOperationPool { let bls_to_execution_changes = operation_pool .bls_to_execution_changes .read() - .iter() - .map(|(_, bls_to_execution_change)| bls_to_execution_change.clone()) + .iter_fifo() + .map(|bls_to_execution_change| (**bls_to_execution_change).clone()) .collect(); PersistedOperationPool::V14(PersistedOperationPoolV14 { @@ -153,18 +154,13 @@ impl PersistedOperationPool { PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => { return Err(OpPoolError::IncorrectOpPoolVariant) } - PersistedOperationPool::V14(pool) => RwLock::new( - pool.bls_to_execution_changes - .iter() - .cloned() - .map(|bls_to_execution_change| { - ( - bls_to_execution_change.as_inner().message.validator_index, - bls_to_execution_change, - ) - }) - .collect(), - ), + PersistedOperationPool::V14(pool) => { + let mut bls_to_execution_changes = BlsToExecutionChanges::default(); + for bls_to_execution_change in pool.bls_to_execution_changes { + bls_to_execution_changes.insert(bls_to_execution_change); + } + RwLock::new(bls_to_execution_changes) + } }; let op_pool = OperationPool { attestations, diff --git a/book/src/checkpoint-sync.md b/book/src/checkpoint-sync.md index 736aa08f1c..893c545cb9 100644 --- a/book/src/checkpoint-sync.md +++ b/book/src/checkpoint-sync.md @@ -48,17 +48,6 @@ The Ethereum community provides various [public endpoints](https://eth-clients.g lighthouse bn --checkpoint-sync-url https://example.com/ ... ``` -### Use Infura as a remote beacon node provider - -You can use Infura as the remote beacon node provider to load the initial checkpoint state. - -1. Sign up for the free Infura ETH2 API using the `Create new project tab` on the [Infura dashboard](https://infura.io/dashboard). -2. Copy the HTTPS endpoint for the required network (Mainnet/Prater). -3. Use it as the url for the `--checkpoint-sync-url` flag. e.g. -``` -lighthouse bn --checkpoint-sync-url https://:@eth2-beacon-mainnet.infura.io ... -``` - ## Backfilling Blocks Once forwards sync completes, Lighthouse will commence a "backfill sync" to download the blocks diff --git a/book/src/installation-source.md b/book/src/installation-source.md index b3d83ef9f9..8e515a41bd 100644 --- a/book/src/installation-source.md +++ b/book/src/installation-source.md @@ -64,6 +64,7 @@ choco install protoc These dependencies are for compiling Lighthouse natively on Windows. Lighthouse can also run successfully under the [Windows Subsystem for Linux (WSL)][WSL]. If using Ubuntu under WSL, you should follow the instructions for Ubuntu listed in the [Dependencies (Ubuntu)](#ubuntu) section. + [WSL]: https://docs.microsoft.com/en-us/windows/wsl/about ## Build Lighthouse @@ -128,8 +129,12 @@ Commonly used features include: * `gnosis`: support for the Gnosis Beacon Chain. * `portable`: support for legacy hardware. * `modern`: support for exclusively modern hardware. -* `slasher-mdbx`: support for the MDBX slasher backend (enabled by default). +* `slasher-mdbx`: support for the MDBX slasher backend. Enabled by default. * `slasher-lmdb`: support for the LMDB slasher backend. +* `jemalloc`: use [`jemalloc`][jemalloc] to allocate memory. Enabled by default on Linux and macOS. + Not supported on Windows. + +[jemalloc]: https://jemalloc.net/ ## Compilation Profiles diff --git a/book/src/merge-migration.md b/book/src/merge-migration.md index 08f1b51e42..ec9aeaaee8 100644 --- a/book/src/merge-migration.md +++ b/book/src/merge-migration.md @@ -58,7 +58,7 @@ supported. Each execution engine has its own flags for configuring the engine API and JWT. Please consult the relevant page for your execution engine for the required flags: -- [Geth: Connecting to Consensus Clients](https://geth.ethereum.org/docs/interface/consensus-clients) +- [Geth: Connecting to Consensus Clients](https://geth.ethereum.org/docs/getting-started/consensus-clients) - [Nethermind: Running Nethermind Post Merge](https://docs.nethermind.io/nethermind/first-steps-with-nethermind/running-nethermind-post-merge) - [Besu: Prepare For The Merge](https://besu.hyperledger.org/en/stable/HowTo/Upgrade/Prepare-for-The-Merge/) - [Erigon: Beacon Chain (Consensus Layer)](https://github.com/ledgerwatch/erigon#beacon-chain-consensus-layer) @@ -203,5 +203,5 @@ guidance for specific setups. - [Ethereum.org: The Merge](https://ethereum.org/en/upgrades/merge/) - [Ethereum Staking Launchpad: Merge Readiness](https://launchpad.ethereum.org/en/merge-readiness). - [CoinCashew: Ethereum Merge Upgrade Checklist](https://www.coincashew.com/coins/overview-eth/ethereum-merge-upgrade-checklist-for-home-stakers-and-validators) -- [EthDocker: Merge Preparation](https://eth-docker.net/docs/About/MergePrep/) +- [EthDocker: Merge Preparation](https://eth-docker.net/About/MergePrep/) - [Remy Roy: How to join the Goerli/Prater merge testnet](https://github.com/remyroy/ethstaker/blob/main/merge-goerli-prater.md) diff --git a/book/src/run_a_node.md b/book/src/run_a_node.md index 5ce42aa630..fb112c3675 100644 --- a/book/src/run_a_node.md +++ b/book/src/run_a_node.md @@ -26,7 +26,7 @@ has authority to control the execution engine. Each execution engine has its own flags for configuring the engine API and JWT. Please consult the relevant page of your execution engine for the required flags: -- [Geth: Connecting to Consensus Clients](https://geth.ethereum.org/docs/interface/consensus-clients) +- [Geth: Connecting to Consensus Clients](https://geth.ethereum.org/docs/getting-started/consensus-clients) - [Nethermind: Running Nethermind & CL](https://docs.nethermind.io/nethermind/first-steps-with-nethermind/running-nethermind-post-merge) - [Besu: Connect to Mainnet](https://besu.hyperledger.org/en/stable/public-networks/get-started/connect/mainnet/) - [Erigon: Beacon Chain (Consensus Layer)](https://github.com/ledgerwatch/erigon#beacon-chain-consensus-layer) diff --git a/bors.toml b/bors.toml index 096ac3b29a..9e633d63f5 100644 --- a/bors.toml +++ b/bors.toml @@ -10,7 +10,6 @@ status = [ "merge-transition-ubuntu", "no-eth1-simulator-ubuntu", "check-benchmarks", - "check-consensus", "clippy", "arbitrary-check", "cargo-audit", diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 257819fd83..9e67f6decc 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1042,6 +1042,24 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/pool/bls_to_execution_changes` + pub async fn post_beacon_pool_bls_to_execution_changes( + &self, + address_changes: &[SignedBlsToExecutionChange], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("bls_to_execution_changes"); + + self.post(path, &address_changes).await?; + + Ok(()) + } + /// `GET beacon/deposit_snapshot` pub async fn get_deposit_snapshot(&self) -> Result, Error> { use ssz::Decode; @@ -1056,6 +1074,24 @@ impl BeaconNodeHttpClient { .transpose() } + /// `POST beacon/rewards/sync_committee` + pub async fn post_beacon_rewards_sync_committee( + &self, + rewards: &[Option>], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("rewards") + .push("sync_committee"); + + self.post(path, &rewards).await?; + + Ok(()) + } + /// `POST validator/contribution_and_proofs` pub async fn post_validator_contribution_and_proofs( &self, diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 2dced1c449..068abd693a 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -3,6 +3,7 @@ mod attestation_performance; mod block_packing_efficiency; mod block_rewards; +mod sync_committee_rewards; use crate::{ ok_or_error, @@ -27,6 +28,7 @@ pub use block_packing_efficiency::{ }; pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use lighthouse_network::{types::SyncState, PeerInfo}; +pub use sync_committee_rewards::SyncCommitteeReward; // Define "legacy" implementations of `Option` which use four bytes for encoding the union // selector. diff --git a/common/eth2/src/lighthouse/sync_committee_rewards.rs b/common/eth2/src/lighthouse/sync_committee_rewards.rs new file mode 100644 index 0000000000..cdd6850650 --- /dev/null +++ b/common/eth2/src/lighthouse/sync_committee_rewards.rs @@ -0,0 +1,12 @@ +use serde::{Deserialize, Serialize}; + +// Details about the rewards paid to sync committee members for attesting headers +// All rewards in GWei + +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct SyncCommitteeReward { + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub validator_index: u64, + // sync committee reward in gwei for the validator + pub reward: i64, +} diff --git a/common/malloc_utils/Cargo.toml b/common/malloc_utils/Cargo.toml index 569eed6082..c88ec0bd5a 100644 --- a/common/malloc_utils/Cargo.toml +++ b/common/malloc_utils/Cargo.toml @@ -4,13 +4,21 @@ version = "0.1.0" authors = ["Paul Hauner "] edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] lighthouse_metrics = { path = "../lighthouse_metrics" } lazy_static = "1.4.0" libc = "0.2.79" parking_lot = "0.12.0" +jemalloc-ctl = { version = "0.5.0", optional = true } + +# Jemalloc's background_threads feature requires Linux (pthreads). +[target.'cfg(target_os = "linux")'.dependencies] +jemallocator = { version = "0.5.0", optional = true, features = ["stats", "background_threads"] } + +[target.'cfg(not(target_os = "linux"))'.dependencies] +jemallocator = { version = "0.5.0", optional = true, features = ["stats"] } [features] mallinfo2 = [] +jemalloc = ["jemallocator", "jemalloc-ctl"] +jemalloc-profiling = ["jemallocator/profiling"] diff --git a/common/malloc_utils/src/jemalloc.rs b/common/malloc_utils/src/jemalloc.rs new file mode 100644 index 0000000000..c796ea39a1 --- /dev/null +++ b/common/malloc_utils/src/jemalloc.rs @@ -0,0 +1,52 @@ +//! Set the allocator to `jemalloc`. +//! +//! Due to `jemalloc` requiring configuration at compile time or immediately upon runtime +//! initialisation it is configured via a Cargo config file in `.cargo/config.toml`. +//! +//! The `jemalloc` tuning can be overriden by: +//! +//! A) `JEMALLOC_SYS_WITH_MALLOC_CONF` at compile-time. +//! B) `_RJEM_MALLOC_CONF` at runtime. +use jemalloc_ctl::{arenas, epoch, stats, Error}; +use lazy_static::lazy_static; +use lighthouse_metrics::{set_gauge, try_create_int_gauge, IntGauge}; + +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + +// Metrics for jemalloc. +lazy_static! { + pub static ref NUM_ARENAS: lighthouse_metrics::Result = + try_create_int_gauge("jemalloc_num_arenas", "The number of arenas in use"); + pub static ref BYTES_ALLOCATED: lighthouse_metrics::Result = + try_create_int_gauge("jemalloc_bytes_allocated", "Equivalent to stats.allocated"); + pub static ref BYTES_ACTIVE: lighthouse_metrics::Result = + try_create_int_gauge("jemalloc_bytes_active", "Equivalent to stats.active"); + pub static ref BYTES_MAPPED: lighthouse_metrics::Result = + try_create_int_gauge("jemalloc_bytes_mapped", "Equivalent to stats.mapped"); + pub static ref BYTES_METADATA: lighthouse_metrics::Result = + try_create_int_gauge("jemalloc_bytes_metadata", "Equivalent to stats.metadata"); + pub static ref BYTES_RESIDENT: lighthouse_metrics::Result = + try_create_int_gauge("jemalloc_bytes_resident", "Equivalent to stats.resident"); + pub static ref BYTES_RETAINED: lighthouse_metrics::Result = + try_create_int_gauge("jemalloc_bytes_retained", "Equivalent to stats.retained"); +} + +pub fn scrape_jemalloc_metrics() { + scrape_jemalloc_metrics_fallible().unwrap() +} + +pub fn scrape_jemalloc_metrics_fallible() -> Result<(), Error> { + // Advance the epoch so that the underlying statistics are updated. + epoch::advance()?; + + set_gauge(&NUM_ARENAS, arenas::narenas::read()? as i64); + set_gauge(&BYTES_ALLOCATED, stats::allocated::read()? as i64); + set_gauge(&BYTES_ACTIVE, stats::active::read()? as i64); + set_gauge(&BYTES_MAPPED, stats::mapped::read()? as i64); + set_gauge(&BYTES_METADATA, stats::metadata::read()? as i64); + set_gauge(&BYTES_RESIDENT, stats::resident::read()? as i64); + set_gauge(&BYTES_RETAINED, stats::retained::read()? as i64); + + Ok(()) +} diff --git a/common/malloc_utils/src/lib.rs b/common/malloc_utils/src/lib.rs index b8aed948f8..3bb242369f 100644 --- a/common/malloc_utils/src/lib.rs +++ b/common/malloc_utils/src/lib.rs @@ -2,18 +2,18 @@ //! //! ## Conditional Compilation //! -//! Presently, only configuration for "The GNU Allocator" from `glibc` is supported. All other -//! allocators are ignored. +//! This crate can be compiled with different feature flags to support different allocators: //! -//! It is assumed that if the following two statements are correct then we should expect to -//! configure `glibc`: +//! - Jemalloc, via the `jemalloc` feature. +//! - GNU malloc, if no features are set and the system supports it. +//! - The system allocator, if no features are set and the allocator is not GNU malloc. +//! +//! It is assumed that if Jemalloc is not in use, and the following two statements are correct then +//! we should expect to configure `glibc`: //! //! - `target_os = linux` //! - `target_env != musl` //! -//! In all other cases this library will not attempt to do anything (i.e., all functions are -//! no-ops). -//! //! If the above conditions are fulfilled but `glibc` still isn't present at runtime then a panic //! may be triggered. It is understood that there's no way to be certain that a compatible `glibc` //! is present: https://github.com/rust-lang/rust/issues/33244. @@ -24,18 +24,42 @@ //! detecting `glibc` are best-effort. If this crate throws errors about undefined external //! functions, then try to compile with the `not_glibc_interface` module. -#[cfg(all(target_os = "linux", not(target_env = "musl")))] +#[cfg(all( + target_os = "linux", + not(target_env = "musl"), + not(feature = "jemalloc") +))] mod glibc; +#[cfg(feature = "jemalloc")] +mod jemalloc; + pub use interface::*; -#[cfg(all(target_os = "linux", not(target_env = "musl")))] +#[cfg(all( + target_os = "linux", + not(target_env = "musl"), + not(feature = "jemalloc") +))] mod interface { pub use crate::glibc::configure_glibc_malloc as configure_memory_allocator; pub use crate::glibc::scrape_mallinfo_metrics as scrape_allocator_metrics; } -#[cfg(any(not(target_os = "linux"), target_env = "musl"))] +#[cfg(feature = "jemalloc")] +mod interface { + #[allow(dead_code)] + pub fn configure_memory_allocator() -> Result<(), String> { + Ok(()) + } + + pub use crate::jemalloc::scrape_jemalloc_metrics as scrape_allocator_metrics; +} + +#[cfg(all( + any(not(target_os = "linux"), target_env = "musl"), + not(feature = "jemalloc") +))] mod interface { #[allow(dead_code, clippy::unnecessary_wraps)] pub fn configure_memory_allocator() -> Result<(), String> { diff --git a/consensus/state_processing/src/verify_operation.rs b/consensus/state_processing/src/verify_operation.rs index efd356462d..50ac2ff3de 100644 --- a/consensus/state_processing/src/verify_operation.rs +++ b/consensus/state_processing/src/verify_operation.rs @@ -67,7 +67,7 @@ where fn new(op: T, state: &BeaconState) -> Self { let verified_against = VerifiedAgainst { fork_versions: op - .verification_epochs(state.current_epoch()) + .verification_epochs() .into_iter() .map(|epoch| state.fork().get_fork_version(epoch)) .collect(), @@ -89,13 +89,9 @@ where } pub fn signature_is_still_valid(&self, current_fork: &Fork) -> bool { - // Pass the fork's epoch as the effective current epoch. If the message is a current-epoch - // style message like `SignedBlsToExecutionChange` then `get_fork_version` will return the - // current fork version and we'll check it matches the fork version the message was checked - // against. - let effective_current_epoch = current_fork.epoch; + // The .all() will return true if the iterator is empty. self.as_inner() - .verification_epochs(effective_current_epoch) + .verification_epochs() .into_iter() .zip(self.verified_against.fork_versions.iter()) .all(|(epoch, verified_fork_version)| { @@ -126,12 +122,8 @@ pub trait VerifyOperation: Encode + Decode + Sized { /// /// These need to map 1-to-1 to the `SigVerifiedOp::verified_against` for this type. /// - /// If the message contains no inherent epoch it should return the `current_epoch` that is - /// passed in, as that's the epoch at which it was verified. - fn verification_epochs( - &self, - current_epoch: Epoch, - ) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]>; + /// If the message is valid across all forks it should return an empty smallvec. + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]>; } impl VerifyOperation for SignedVoluntaryExit { @@ -147,7 +139,7 @@ impl VerifyOperation for SignedVoluntaryExit { } #[allow(clippy::integer_arithmetic)] - fn verification_epochs(&self, _: Epoch) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { smallvec![self.message.epoch] } } @@ -165,7 +157,7 @@ impl VerifyOperation for AttesterSlashing { } #[allow(clippy::integer_arithmetic)] - fn verification_epochs(&self, _: Epoch) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { smallvec![ self.attestation_1.data.target.epoch, self.attestation_2.data.target.epoch @@ -186,7 +178,7 @@ impl VerifyOperation for ProposerSlashing { } #[allow(clippy::integer_arithmetic)] - fn verification_epochs(&self, _: Epoch) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { // Only need a single epoch because the slots of the two headers must be equal. smallvec![self .signed_header_1 @@ -209,10 +201,7 @@ impl VerifyOperation for SignedBlsToExecutionChange { } #[allow(clippy::integer_arithmetic)] - fn verification_epochs( - &self, - current_epoch: Epoch, - ) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { - smallvec![current_epoch] + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + smallvec![] } } diff --git a/consensus/types/src/beacon_state/tests.rs b/consensus/types/src/beacon_state/tests.rs index abca10e372..d63eaafc4b 100644 --- a/consensus/types/src/beacon_state/tests.rs +++ b/consensus/types/src/beacon_state/tests.rs @@ -2,7 +2,7 @@ use crate::test_utils::*; use crate::test_utils::{SeedableRng, XorShiftRng}; use beacon_chain::test_utils::{ - interop_genesis_state, test_spec, BeaconChainHarness, EphemeralHarnessType, + interop_genesis_state_with_eth1, test_spec, BeaconChainHarness, EphemeralHarnessType, DEFAULT_ETH1_BLOCK_HASH, }; use beacon_chain::types::{ @@ -551,7 +551,7 @@ fn tree_hash_cache_linear_history_long_skip() { let spec = &test_spec::(); // This state has a cache that advances normally each slot. - let mut state: BeaconState = interop_genesis_state( + let mut state: BeaconState = interop_genesis_state_with_eth1( &keypairs, 0, Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH), diff --git a/consensus/types/src/bls_to_execution_change.rs b/consensus/types/src/bls_to_execution_change.rs index f6064f65ab..cb73e43f9a 100644 --- a/consensus/types/src/bls_to_execution_change.rs +++ b/consensus/types/src/bls_to_execution_change.rs @@ -28,6 +28,26 @@ pub struct BlsToExecutionChange { impl SignedRoot for BlsToExecutionChange {} +impl BlsToExecutionChange { + pub fn sign( + self, + secret_key: &SecretKey, + genesis_validators_root: Hash256, + spec: &ChainSpec, + ) -> SignedBlsToExecutionChange { + let domain = spec.compute_domain( + Domain::BlsToExecutionChange, + spec.genesis_fork_version, + genesis_validators_root, + ); + let message = self.signing_root(domain); + SignedBlsToExecutionChange { + message: self, + signature: secret_key.sign(message), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 8ed0575e51..821f5c57ec 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [features] portable = ["bls/supranational-portable"] fake_crypto = ['bls/fake_crypto'] +jemalloc = ["malloc_utils/jemalloc"] [dependencies] bls = { path = "../crypto/bls" } @@ -42,3 +43,7 @@ eth2 = { path = "../common/eth2" } snap = "1.0.1" beacon_chain = { path = "../beacon_node/beacon_chain" } store = { path = "../beacon_node/store" } +malloc_utils = { path = "../common/malloc_utils" } + +[package.metadata.cargo-udeps.ignore] +normal = ["malloc_utils"] diff --git a/lcli/src/main.rs b/lcli/src/main.rs index dd075531e8..92ecc78ed1 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -827,6 +827,7 @@ fn run( debug_level: String::from("trace"), logfile_debug_level: String::from("trace"), log_format: None, + logfile_format: None, log_color: false, disable_log_timestamp: false, max_log_size: 0, diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index f39fc514be..cc43660598 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -24,6 +24,8 @@ gnosis = [] slasher-mdbx = ["slasher/mdbx"] # Support slasher LMDB backend. slasher-lmdb = ["slasher/lmdb"] +# Use jemalloc. +jemalloc = ["malloc_utils/jemalloc"] [dependencies] beacon_node = { "path" = "../beacon_node" } diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index fad7edeb19..8ef67e82dd 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -50,6 +50,7 @@ pub struct LoggerConfig { pub debug_level: String, pub logfile_debug_level: String, pub log_format: Option, + pub logfile_format: Option, pub log_color: bool, pub disable_log_timestamp: bool, pub max_log_size: u64, @@ -64,6 +65,7 @@ impl Default for LoggerConfig { debug_level: String::from("info"), logfile_debug_level: String::from("debug"), log_format: None, + logfile_format: None, log_color: false, disable_log_timestamp: false, max_log_size: 200, @@ -252,7 +254,7 @@ impl EnvironmentBuilder { let file_logger = FileLoggerBuilder::new(&path) .level(logfile_level) .channel_size(LOG_CHANNEL_SIZE) - .format(match config.log_format.as_deref() { + .format(match config.logfile_format.as_deref() { Some("JSON") => Format::Json, _ => Format::default(), }) diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index da72204f96..babe2f8dca 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -31,6 +31,14 @@ fn bls_library_name() -> &'static str { } } +fn allocator_name() -> &'static str { + if cfg!(feature = "jemalloc") { + "jemalloc" + } else { + "system" + } +} + fn main() { // Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided. if std::env::var("RUST_BACKTRACE").is_err() { @@ -51,10 +59,12 @@ fn main() { "{}\n\ BLS library: {}\n\ SHA256 hardware acceleration: {}\n\ + Allocator: {}\n\ Specs: mainnet (true), minimal ({}), gnosis ({})", VERSION.replace("Lighthouse/", ""), bls_library_name(), have_sha_extensions(), + allocator_name(), cfg!(feature = "spec-minimal"), cfg!(feature = "gnosis"), ).as_str() @@ -99,6 +109,15 @@ fn main() { .default_value("debug") .global(true), ) + .arg( + Arg::with_name("logfile-format") + .long("logfile-format") + .value_name("FORMAT") + .help("Specifies the log format used when emitting logs to the logfile.") + .possible_values(&["DEFAULT", "JSON"]) + .takes_value(true) + .global(true) + ) .arg( Arg::with_name("logfile-max-size") .long("logfile-max-size") @@ -402,6 +421,11 @@ fn run( .value_of("logfile-debug-level") .ok_or("Expected --logfile-debug-level flag")?; + let logfile_format = matches + .value_of("logfile-format") + // Ensure that `logfile-format` defaults to the value of `log-format`. + .or_else(|| matches.value_of("log-format")); + let logfile_max_size: u64 = matches .value_of("logfile-max-size") .ok_or("Expected --logfile-max-size flag")? @@ -452,6 +476,7 @@ fn run( debug_level: String::from(debug_level), logfile_debug_level: String::from(logfile_debug_level), log_format: log_format.map(String::from), + logfile_format: logfile_format.map(String::from), log_color, disable_log_timestamp, max_log_size: logfile_max_size * 1_024 * 1_024, diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 4a2e160e8b..7e581ee615 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1662,7 +1662,24 @@ fn logfile_no_restricted_perms_flag() { assert!(config.logger_config.is_restricted == false); }); } - +#[test] +fn logfile_format_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert_eq!(config.logger_config.logfile_format, None)); +} +#[test] +fn logfile_format_flag() { + CommandLineTest::new() + .flag("logfile-format", Some("JSON")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.logger_config.logfile_format, + Some("JSON".to_string()) + ) + }); +} #[test] fn sync_eth1_chain_default() { CommandLineTest::new() diff --git a/testing/antithesis/Dockerfile.libvoidstar b/testing/antithesis/Dockerfile.libvoidstar index a92bf6dbd6..df0fc71b53 100644 --- a/testing/antithesis/Dockerfile.libvoidstar +++ b/testing/antithesis/Dockerfile.libvoidstar @@ -1,11 +1,9 @@ -FROM rust:1.62.1-bullseye AS builder -RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake clang libclang-dev +FROM rust:1.66.1-bullseye AS builder +RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev clang protobuf-compiler COPY . lighthouse # Build lighthouse directly with a cargo build command, bypassing the Makefile. -# We have to use nightly in order to disable the new LLVM pass manager. -RUN rustup default nightly-2022-07-26 && cd lighthouse && LD_LIBRARY_PATH=/lighthouse/testing/antithesis/libvoidstar/ RUSTFLAGS="-Znew-llvm-pass-manager=no -Cpasses=sancov -Cllvm-args=-sanitizer-coverage-level=3 -Cllvm-args=-sanitizer-coverage-trace-pc-guard -Ccodegen-units=1 -Cdebuginfo=2 -L/lighthouse/testing/antithesis/libvoidstar/ -lvoidstar" cargo build --release --manifest-path lighthouse/Cargo.toml --target x86_64-unknown-linux-gnu --features modern --verbose --bin lighthouse - +RUN cd lighthouse && LD_LIBRARY_PATH=/lighthouse/testing/antithesis/libvoidstar/ RUSTFLAGS="-Cpasses=sancov-module -Cllvm-args=-sanitizer-coverage-level=3 -Cllvm-args=-sanitizer-coverage-trace-pc-guard -Ccodegen-units=1 -Cdebuginfo=2 -L/lighthouse/testing/antithesis/libvoidstar/ -lvoidstar" cargo build --release --manifest-path lighthouse/Cargo.toml --target x86_64-unknown-linux-gnu --features modern --verbose --bin lighthouse # build lcli binary directly with cargo install command, bypassing the makefile RUN cargo install --path /lighthouse/lcli --force --locked diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 8284bff609..42aefea7a5 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -62,6 +62,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { debug_level: String::from("debug"), logfile_debug_level: String::from("debug"), log_format: None, + logfile_format: None, log_color: false, disable_log_timestamp: false, max_log_size: 0, diff --git a/testing/simulator/src/no_eth1_sim.rs b/testing/simulator/src/no_eth1_sim.rs index 53c4447da2..1a026ded46 100644 --- a/testing/simulator/src/no_eth1_sim.rs +++ b/testing/simulator/src/no_eth1_sim.rs @@ -47,6 +47,7 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> { debug_level: String::from("debug"), logfile_debug_level: String::from("debug"), log_format: None, + logfile_format: None, log_color: false, disable_log_timestamp: false, max_log_size: 0, diff --git a/testing/simulator/src/sync_sim.rs b/testing/simulator/src/sync_sim.rs index 1c8b41f057..9d759715eb 100644 --- a/testing/simulator/src/sync_sim.rs +++ b/testing/simulator/src/sync_sim.rs @@ -51,6 +51,7 @@ fn syncing_sim( debug_level: String::from(log_level), logfile_debug_level: String::from("debug"), log_format: log_format.map(String::from), + logfile_format: None, log_color: false, disable_log_timestamp: false, max_log_size: 0, diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index e4081f9645..6fd519ebaf 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -335,6 +335,11 @@ impl BlockService { let proposer_index = self.validator_store.validator_index(&validator_pubkey); let validator_pubkey_ref = &validator_pubkey; + info!( + log, + "Requesting unsigned block"; + "slot" => slot.as_u64(), + ); // Request block from first responsive beacon node. let block = self .beacon_nodes @@ -385,6 +390,11 @@ impl BlockService { } }; + info!( + log, + "Received unsigned block"; + "slot" => slot.as_u64(), + ); if proposer_index != Some(block.proposer_index()) { return Err(BlockError::Recoverable( "Proposer index does not match block proposer. Beacon chain re-orged" @@ -403,6 +413,11 @@ impl BlockService { .await .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?; + info!( + log, + "Publishing signed block"; + "slot" => slot.as_u64(), + ); // Publish block with first available beacon node. self.beacon_nodes .first_success( diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 4db9804054..00c3db7aa1 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -31,6 +31,7 @@ use crate::beacon_node_fallback::{ }; use crate::doppelganger_service::DoppelgangerService; use crate::graffiti_file::GraffitiFile; +use crate::initialized_validators::Error::UnableToOpenVotingKeystore; use account_utils::validator_definitions::ValidatorDefinitions; use attestation_service::{AttestationService, AttestationServiceBuilder}; use block_service::{BlockService, BlockServiceBuilder}; @@ -184,7 +185,16 @@ impl ProductionValidatorClient { log.clone(), ) .await - .map_err(|e| format!("Unable to initialize validators: {:?}", e))?; + .map_err(|e| { + match e { + UnableToOpenVotingKeystore(err) => { + format!("Unable to initialize validators: {:?}. If you have recently moved the location of your data directory \ + make sure to update the location of voting_keystore_path in your validator_definitions.yml", err) + }, + err => { + format!("Unable to initialize validators: {:?}", err)} + } + })?; let voting_pubkeys: Vec<_> = validators.iter_voting_pubkeys().collect();