diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index a94a19900c..d86abd0721 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -196,7 +196,7 @@ jobs: - name: Run network tests for all known forks run: make test-network env: - TEST_FEATURES: portable,ci_logger + TEST_FEATURES: portable CI_LOGGER_DIR: ${{ runner.temp }}/network_test_logs - name: Upload logs uses: actions/upload-artifact@v4 diff --git a/Cargo.lock b/Cargo.lock index 204bc39082..1b506f6212 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -813,6 +813,8 @@ dependencies = [ "maplit", "merkle_proof", "metrics", + "mockall", + "mockall_double", "once_cell", "oneshot_broadcast", "operation_pool", @@ -2376,6 +2378,12 @@ dependencies = [ "validator_store", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dtoa" version = "1.0.10" @@ -3504,6 +3512,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" + [[package]] name = "fs2" version = "0.4.3" @@ -5932,6 +5946,44 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9366861eb2a2c436c20b12c8dbec5f798cea6b47ad99216be0282942e2c81ea0" +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "mockall_double" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ca96e5ac35256ae3e13536edd39b172b88f41615e1d7b653c8ad24524113e8" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "mockito" version = "1.7.0" @@ -6898,6 +6950,32 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "predicates" +version = "3.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa" + +[[package]] +name = "predicates-tree" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_reqwest_error" version = "0.1.0" @@ -8902,6 +8980,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termtree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" + [[package]] name = "test_random_derive" version = "0.2.0" @@ -9804,6 +9888,7 @@ dependencies = [ name = "validator_store" version = "0.1.0" dependencies = [ + "eth2", "slashing_protection", "types", ] @@ -10059,6 +10144,7 @@ dependencies = [ "account_utils", "async-channel 1.9.0", "environment", + "eth2", "eth2_keystore", "eth2_network_config", "futures", diff --git a/Cargo.toml b/Cargo.toml index 86cca0a259..952b43a66b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,6 @@ [workspace] members = [ "account_manager", - "beacon_node", "beacon_node/beacon_chain", "beacon_node/beacon_processor", @@ -17,9 +16,7 @@ members = [ "beacon_node/operation_pool", "beacon_node/store", "beacon_node/timer", - "boot_node", - "common/account_utils", "common/clap_utils", "common/compare_fields", @@ -52,7 +49,6 @@ members = [ "common/validator_dir", "common/warp_utils", "common/workspace_members", - "consensus/context_deserialize", "consensus/context_deserialize_derive", "consensus/fixed_bytes", @@ -64,23 +60,17 @@ members = [ "consensus/state_processing", "consensus/swap_or_not_shuffle", "consensus/types", - "crypto/bls", "crypto/eth2_key_derivation", "crypto/eth2_keystore", "crypto/eth2_wallet", "crypto/kzg", - "database_manager", - "lcli", - "lighthouse", "lighthouse/environment", - "slasher", "slasher/service", - "testing/ef_tests", "testing/eth1_test_rig", "testing/execution_engine_integration", @@ -89,8 +79,6 @@ members = [ "testing/state_transition_vectors", "testing/validator_test_rig", "testing/web3signer_tests", - - "validator_client", "validator_client/beacon_node_fallback", "validator_client/doppelganger_service", @@ -103,7 +91,6 @@ members = [ "validator_client/slashing_protection", "validator_client/validator_metrics", "validator_client/validator_services", - "validator_manager", ] resolver = "2" @@ -112,63 +99,109 @@ resolver = "2" edition = "2021" [workspace.dependencies] +account_utils = { path = "common/account_utils" } +alloy-consensus = "0.3.0" alloy-primitives = { version = "0.8", features = ["rlp", "getrandom"] } alloy-rlp = "0.3.4" -alloy-consensus = "0.3.0" anyhow = "1" arbitrary = { version = "1", features = ["derive"] } async-channel = "1.9.0" axum = "0.7.7" +beacon_chain = { path = "beacon_node/beacon_chain" } +beacon_node = { path = "beacon_node" } +beacon_node_fallback = { path = "validator_client/beacon_node_fallback" } +beacon_processor = { path = "beacon_node/beacon_processor" } bincode = "1" bitvec = "1" +bls = { path = "crypto/bls" } byteorder = "1" bytes = "1" -cargo_metadata = "0.19" -clap = { version = "4.5.4", features = ["derive", "cargo", "wrap_help"] } # Turn off c-kzg's default features which include `blst/portable`. We can turn on blst's portable # feature ourselves when desired. c-kzg = { version = "1", default-features = false } +cargo_metadata = "0.19" +clap = { version = "4.5.4", features = ["derive", "cargo", "wrap_help"] } +clap_utils = { path = "common/clap_utils" } +compare_fields = { path = "common/compare_fields" } compare_fields_derive = { path = "common/compare_fields_derive" } context_deserialize = { path = "consensus/context_deserialize" } context_deserialize_derive = { path = "consensus/context_deserialize_derive" } criterion = "0.5" delay_map = "0.4" +deposit_contract = { path = "common/deposit_contract" } derivative = "2" +directory = { path = "common/directory" } dirs = "3" -either = "1.9" -rust_eth_kzg = "0.5.4" discv5 = { version = "0.9", features = ["libp2p"] } +doppelganger_service = { path = "validator_client/doppelganger_service" } +either = "1.9" env_logger = "0.9" +environment = { path = "lighthouse/environment" } +eth1 = { path = "beacon_node/eth1" } +eth1_test_rig = { path = "testing/eth1_test_rig" } +eth2 = { path = "common/eth2" } +eth2_config = { path = "common/eth2_config" } +eth2_key_derivation = { path = "crypto/eth2_key_derivation" } +eth2_keystore = { path = "crypto/eth2_keystore" } +eth2_network_config = { path = "common/eth2_network_config" } +eth2_wallet = { path = "crypto/eth2_wallet" } ethereum_hashing = "0.7.0" ethereum_serde_utils = "0.7" ethereum_ssz = "0.8.2" ethereum_ssz_derive = "0.8.2" ethers-core = "1" +ethers-middleware = { version = "1", default-features = false } ethers-providers = { version = "1", default-features = false } ethers-signers = { version = "1", default-features = false } -ethers-middleware = { version = "1", default-features = false } +execution_layer = { path = "beacon_node/execution_layer" } exit-future = "0.2" +filesystem = { path = "common/filesystem" } +fixed_bytes = { path = "consensus/fixed_bytes" } fnv = "1" +fork_choice = { path = "consensus/fork_choice" } fs2 = "0.4" futures = "0.3" -graffiti_file = { path = "validator_client/graffiti_file" } +genesis = { path = "beacon_node/genesis" } gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/sigp/rust-libp2p.git", rev = "61b2820" } -hex = "0.4" +graffiti_file = { path = "validator_client/graffiti_file" } hashlink = "0.9.0" +health_metrics = { path = "common/health_metrics" } +hex = "0.4" +http_api = { path = "beacon_node/http_api" } hyper = "1" +initialized_validators = { path = "validator_client/initialized_validators" } +int_to_bytes = { path = "consensus/int_to_bytes" } itertools = "0.10" +kzg = { path = "crypto/kzg" } libsecp256k1 = "0.7" +lighthouse_network = { path = "beacon_node/lighthouse_network" } +lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" } +lighthouse_version = { path = "common/lighthouse_version" } +lockfile = { path = "common/lockfile" } log = "0.4" -logroller = "0.1.4" +logging = { path = "common/logging" } +logroller = "0.1.8" lru = "0.12" +lru_cache = { path = "common/lru_cache" } +malloc_utils = { path = "common/malloc_utils" } maplit = "1" +merkle_proof = { path = "consensus/merkle_proof" } +metrics = { path = "common/metrics" } milhouse = "0.5" +mockall = "0.13" +mockall_double = "0.3" mockito = "1.5.0" +monitoring_api = { path = "common/monitoring_api" } +network = { path = "beacon_node/network" } +node_test_rig = { path = "testing/node_test_rig" } num_cpus = "1" once_cell = "1.17.1" +operation_pool = { path = "beacon_node/operation_pool" } parking_lot = "0.12" paste = "1" +pretty_reqwest_error = { path = "common/pretty_reqwest_error" } prometheus = { version = "0.13", default-features = false } +proto_array = { path = "consensus/proto_array" } quickcheck = "1" quickcheck_macros = "1" quote = "1" @@ -186,18 +219,30 @@ reqwest = { version = "0.11", default-features = false, features = [ ring = "0.17" rpds = "0.11" rusqlite = { version = "0.28", features = ["bundled"] } +rust_eth_kzg = "0.5.4" +safe_arith = { path = "consensus/safe_arith" } +sensitive_url = { path = "common/sensitive_url" } serde = { version = "1", features = ["derive"] } serde_json = "1" serde_repr = "0.1" serde_yaml = "0.9" sha2 = "0.9" +signing_method = { path = "validator_client/signing_method" } +slasher = { path = "slasher", default-features = false } +slashing_protection = { path = "validator_client/slashing_protection" } +slot_clock = { path = "common/slot_clock" } smallvec = { version = "1.11.2", features = ["arbitrary"] } snap = "1" ssz_types = "0.10" +state_processing = { path = "consensus/state_processing" } +store = { path = "beacon_node/store" } strum = { version = "0.24", features = ["derive"] } superstruct = "0.8" +swap_or_not_shuffle = { path = "consensus/swap_or_not_shuffle" } syn = "1" sysinfo = "0.26" +system_health = { path = "common/system_health" } +task_executor = { path = "common/task_executor" } tempfile = "3" tokio = { version = "1", features = [ "rt-multi-thread", @@ -214,72 +259,10 @@ tracing-log = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } tree_hash = "0.9" tree_hash_derive = "0.9" -url = "2" -uuid = { version = "0.8", features = ["serde", "v4"] } -warp = { version = "0.3.7", default-features = false, features = ["tls"] } -zeroize = { version = "1", features = ["zeroize_derive", "serde"] } -zip = "0.6" - -# Local crates. -account_utils = { path = "common/account_utils" } -beacon_chain = { path = "beacon_node/beacon_chain" } -beacon_node = { path = "beacon_node" } -beacon_node_fallback = { path = "validator_client/beacon_node_fallback" } -beacon_processor = { path = "beacon_node/beacon_processor" } -bls = { path = "crypto/bls" } -clap_utils = { path = "common/clap_utils" } -compare_fields = { path = "common/compare_fields" } -deposit_contract = { path = "common/deposit_contract" } -directory = { path = "common/directory" } -doppelganger_service = { path = "validator_client/doppelganger_service" } -environment = { path = "lighthouse/environment" } -eth1 = { path = "beacon_node/eth1" } -eth1_test_rig = { path = "testing/eth1_test_rig" } -eth2 = { path = "common/eth2" } -eth2_config = { path = "common/eth2_config" } -eth2_key_derivation = { path = "crypto/eth2_key_derivation" } -eth2_keystore = { path = "crypto/eth2_keystore" } -eth2_network_config = { path = "common/eth2_network_config" } -eth2_wallet = { path = "crypto/eth2_wallet" } -execution_layer = { path = "beacon_node/execution_layer" } -fixed_bytes = { path = "consensus/fixed_bytes" } -filesystem = { path = "common/filesystem" } -fork_choice = { path = "consensus/fork_choice" } -genesis = { path = "beacon_node/genesis" } -health_metrics = { path = "common/health_metrics" } -http_api = { path = "beacon_node/http_api" } -initialized_validators = { path = "validator_client/initialized_validators" } -int_to_bytes = { path = "consensus/int_to_bytes" } -kzg = { path = "crypto/kzg" } -metrics = { path = "common/metrics" } -lighthouse_network = { path = "beacon_node/lighthouse_network" } -lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" } -lighthouse_version = { path = "common/lighthouse_version" } -workspace_members = { path = "common/workspace_members" } -lockfile = { path = "common/lockfile" } -logging = { path = "common/logging" } -lru_cache = { path = "common/lru_cache" } -malloc_utils = { path = "common/malloc_utils" } -merkle_proof = { path = "consensus/merkle_proof" } -monitoring_api = { path = "common/monitoring_api" } -network = { path = "beacon_node/network" } -node_test_rig = { path = "testing/node_test_rig" } -operation_pool = { path = "beacon_node/operation_pool" } -pretty_reqwest_error = { path = "common/pretty_reqwest_error" } -proto_array = { path = "consensus/proto_array" } -safe_arith = { path = "consensus/safe_arith" } -sensitive_url = { path = "common/sensitive_url" } -signing_method = { path = "validator_client/signing_method" } -slasher = { path = "slasher", default-features = false } -slashing_protection = { path = "validator_client/slashing_protection" } -slot_clock = { path = "common/slot_clock" } -state_processing = { path = "consensus/state_processing" } -store = { path = "beacon_node/store" } -swap_or_not_shuffle = { path = "consensus/swap_or_not_shuffle" } -system_health = { path = "common/system_health" } -task_executor = { path = "common/task_executor" } types = { path = "consensus/types" } unused_port = { path = "common/unused_port" } +url = "2" +uuid = { version = "0.8", features = ["serde", "v4"] } validator_client = { path = "validator_client" } validator_dir = { path = "common/validator_dir" } validator_http_api = { path = "validator_client/http_api" } @@ -288,8 +271,12 @@ validator_metrics = { path = "validator_client/validator_metrics" } validator_services = { path = "validator_client/validator_services" } validator_store = { path = "validator_client/validator_store" } validator_test_rig = { path = "testing/validator_test_rig" } +warp = { version = "0.3.7", default-features = false, features = ["tls"] } warp_utils = { path = "common/warp_utils" } +workspace_members = { path = "common/workspace_members" } xdelta3 = { git = "http://github.com/sigp/xdelta3-rs", rev = "4db64086bb02e9febb584ba93b9d16bb2ae3825a" } +zeroize = { version = "1", features = ["zeroize_derive", "serde"] } +zip = "0.6" zstd = "0.13" [profile.maxperf] diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 30d6846964..596419c33e 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -11,9 +11,6 @@ edition = { workspace = true } name = "beacon_node" path = "src/lib.rs" -[dev-dependencies] -node_test_rig = { path = "../testing/node_test_rig" } - [features] write_ssz_files = [ "beacon_chain/write_ssz_files", @@ -45,3 +42,6 @@ task_executor = { workspace = true } tracing = { workspace = true } types = { workspace = true } unused_port = { workspace = true } + +[dev-dependencies] +node_test_rig = { path = "../testing/node_test_rig" } diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 18b40cab7e..1bf6ab4326 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -1,3 +1,4 @@ + [package] name = "beacon_chain" version = "0.2.0" @@ -5,10 +6,6 @@ authors = ["Paul Hauner ", "Age Manning { + chain: Arc>, + spec: Arc, +} + +#[cfg_attr(test, automock, allow(dead_code))] +impl FetchBlobsBeaconAdapter { + pub(crate) fn new(chain: Arc>) -> Self { + let spec = chain.spec.clone(); + Self { chain, spec } + } + + pub(crate) fn spec(&self) -> &Arc { + &self.spec + } + + pub(crate) fn kzg(&self) -> &Arc { + &self.chain.kzg + } + + pub(crate) fn executor(&self) -> &TaskExecutor { + &self.chain.task_executor + } + + pub(crate) async fn get_blobs_v1( + &self, + versioned_hashes: Vec, + ) -> Result>>, FetchEngineBlobError> { + let execution_layer = self + .chain + .execution_layer + .as_ref() + .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; + + execution_layer + .get_blobs_v1(versioned_hashes) + .await + .map_err(FetchEngineBlobError::RequestFailed) + } + + pub(crate) async fn get_blobs_v2( + &self, + versioned_hashes: Vec, + ) -> Result>>, FetchEngineBlobError> { + let execution_layer = self + .chain + .execution_layer + .as_ref() + .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; + + execution_layer + .get_blobs_v2(versioned_hashes) + .await + .map_err(FetchEngineBlobError::RequestFailed) + } + + pub(crate) fn verify_blob_for_gossip( + &self, + blob: &Arc>, + ) -> Result, GossipBlobError> { + GossipVerifiedBlob::::new(blob.clone(), blob.index, &self.chain) + } + + pub(crate) async fn process_engine_blobs( + &self, + slot: Slot, + block_root: Hash256, + blobs: EngineGetBlobsOutput, + ) -> Result { + self.chain + .process_engine_blobs(slot, block_root, blobs) + .await + .map_err(FetchEngineBlobError::BlobProcessingError) + } + + pub(crate) fn fork_choice_contains_block(&self, block_root: &Hash256) -> bool { + self.chain + .canonical_head + .fork_choice_read_lock() + .contains_block(block_root) + } +} diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs similarity index 78% rename from beacon_node/beacon_chain/src/fetch_blobs.rs rename to beacon_node/beacon_chain/src/fetch_blobs/mod.rs index d91f103b9d..ba798137b0 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -8,7 +8,13 @@ //! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity //! supernodes. +mod fetch_blobs_beacon_adapter; +#[cfg(test)] +mod tests; + use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +#[cfg_attr(test, double)] +use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_data_sidecars::DoNotObserve; use crate::{ @@ -18,11 +24,13 @@ use crate::{ use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; use execution_layer::Error as ExecutionLayerError; use metrics::{inc_counter, TryExt}; +#[cfg(test)] +use mockall_double::double; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; use std::collections::HashSet; use std::sync::Arc; -use tracing::debug; +use tracing::{debug, warn}; use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; use types::data_column_sidecar::DataColumnSidecarError; use types::{ @@ -58,6 +66,7 @@ pub enum FetchEngineBlobError { GossipBlob(GossipBlobError), RequestFailed(ExecutionLayerError), RuntimeShutdown, + TokioJoin(tokio::task::JoinError), } /// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or @@ -68,6 +77,25 @@ pub async fn fetch_and_process_engine_blobs( block: Arc>>, custody_columns: HashSet, publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, +) -> Result, FetchEngineBlobError> { + fetch_and_process_engine_blobs_inner( + FetchBlobsBeaconAdapter::new(chain), + block_root, + block, + custody_columns, + publish_fn, + ) + .await +} + +/// Internal implementation of fetch blobs, which uses `FetchBlobsBeaconAdapter` instead of +/// `BeaconChain` for better testability. +async fn fetch_and_process_engine_blobs_inner( + chain_adapter: FetchBlobsBeaconAdapter, + block_root: Hash256, + block: Arc>>, + custody_columns: HashSet, + publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { let versioned_hashes = if let Some(kzg_commitments) = block .message() @@ -90,9 +118,12 @@ pub async fn fetch_and_process_engine_blobs( "Fetching blobs from the EL" ); - if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + if chain_adapter + .spec() + .is_peer_das_enabled_for_epoch(block.epoch()) + { fetch_and_process_blobs_v2( - chain, + chain_adapter, block_root, block, versioned_hashes, @@ -101,32 +132,33 @@ pub async fn fetch_and_process_engine_blobs( ) .await } else { - fetch_and_process_blobs_v1(chain, block_root, block, versioned_hashes, publish_fn).await + fetch_and_process_blobs_v1( + chain_adapter, + block_root, + block, + versioned_hashes, + publish_fn, + ) + .await } } async fn fetch_and_process_blobs_v1( - chain: Arc>, + chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>, versioned_hashes: Vec, publish_fn: impl Fn(BlobsOrDataColumns) + Send + Sized, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; - metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); - let response = execution_layer + let response = chain_adapter .get_blobs_v1(versioned_hashes) .await .inspect_err(|_| { inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); - }) - .map_err(FetchEngineBlobError::RequestFailed)?; + })?; let num_fetched_blobs = response.iter().filter(|opt| opt.is_some()).count(); metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); @@ -148,7 +180,7 @@ async fn fetch_and_process_blobs_v1( response, signed_block_header, &kzg_commitments_proof, - &chain.spec, + chain_adapter.spec(), )?; // Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from @@ -160,7 +192,7 @@ async fn fetch_and_process_blobs_v1( .iter() .filter_map(|opt_blob| { let blob = opt_blob.as_ref()?; - match GossipVerifiedBlob::::new(blob.clone(), blob.index, &chain) { + match chain_adapter.verify_blob_for_gossip(blob) { Ok(verified) => Some(Ok(verified)), // Ignore already seen blobs. Err(GossipBlobError::RepeatBlob { .. }) => None, @@ -176,20 +208,19 @@ async fn fetch_and_process_blobs_v1( debug!(num_fetched_blobs, "Processing engine blobs"); - let availability_processing_status = chain + let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()), ) - .await - .map_err(FetchEngineBlobError::BlobProcessingError)?; + .await?; Ok(Some(availability_processing_status)) } async fn fetch_and_process_blobs_v2( - chain: Arc>, + chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>, versioned_hashes: Vec, @@ -197,52 +228,49 @@ async fn fetch_and_process_blobs_v2( publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); - let execution_layer = chain - .execution_layer - .as_ref() - .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); - let response = execution_layer + let response = chain_adapter .get_blobs_v2(versioned_hashes) .await .inspect_err(|_| { inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); - }) - .map_err(FetchEngineBlobError::RequestFailed)?; + })?; - let (blobs, proofs): (Vec<_>, Vec<_>) = response + let Some(blobs_and_proofs) = response else { + debug!(num_expected_blobs, "No blobs fetched from the EL"); + inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); + return Ok(None); + }; + + let (blobs, proofs): (Vec<_>, Vec<_>) = blobs_and_proofs .into_iter() - .filter_map(|blob_and_proof_opt| { - blob_and_proof_opt.map(|blob_and_proof| { - let BlobAndProofV2 { blob, proofs } = blob_and_proof; - (blob, proofs) - }) + .map(|blob_and_proof| { + let BlobAndProofV2 { blob, proofs } = blob_and_proof; + (blob, proofs) }) .unzip(); let num_fetched_blobs = blobs.len(); metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); - // Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns. if num_fetched_blobs != num_expected_blobs { - debug!( - info = "Unable to compute data columns", - num_fetched_blobs, num_expected_blobs, "Not all blobs fetched from the EL" + // This scenario is not supposed to happen if the EL is spec compliant. + // It should either return all requested blobs or none, but NOT partial responses. + // If we attempt to compute columns with partial blobs, we'd end up with invalid columns. + warn!( + num_fetched_blobs, + num_expected_blobs, "The EL did not return all requested blobs" ); inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); return Ok(None); - } else { - inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); } - if chain - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - // Avoid computing columns if block has already been imported. + inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); + + if chain_adapter.fork_choice_contains_block(&block_root) { + // Avoid computing columns if the block has already been imported. debug!( info = "block has already been imported", "Ignoring EL blobs response" @@ -251,7 +279,7 @@ async fn fetch_and_process_blobs_v2( } let custody_columns = compute_and_publish_data_columns( - &chain, + &chain_adapter, block.clone(), blobs, proofs, @@ -262,29 +290,30 @@ async fn fetch_and_process_blobs_v2( debug!(num_fetched_blobs, "Processing engine blobs"); - let availability_processing_status = chain + let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, EngineGetBlobsOutput::CustodyColumns(custody_columns), ) - .await - .map_err(FetchEngineBlobError::BlobProcessingError)?; + .await?; Ok(Some(availability_processing_status)) } /// Offload the data column computation to a blocking task to avoid holding up the async runtime. async fn compute_and_publish_data_columns( - chain: &Arc>, + chain_adapter: &FetchBlobsBeaconAdapter, block: Arc>>, blobs: Vec>, proofs: Vec>, custody_columns_indices: HashSet, publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { - let chain_cloned = chain.clone(); - chain + let kzg = chain_adapter.kzg().clone(); + let spec = chain_adapter.spec().clone(); + chain_adapter + .executor() .spawn_blocking_handle( move || { let mut timer = metrics::start_timer_vec( @@ -294,14 +323,9 @@ async fn compute_and_publish_data_columns( let blob_refs = blobs.iter().collect::>(); let cell_proofs = proofs.into_iter().flatten().collect(); - let data_columns_result = blobs_to_data_column_sidecars( - &blob_refs, - cell_proofs, - &block, - &chain_cloned.kzg, - &chain_cloned.spec, - ) - .discard_timer_on_break(&mut timer); + let data_columns_result = + blobs_to_data_column_sidecars(&blob_refs, cell_proofs, &block, &kzg, &spec) + .discard_timer_on_break(&mut timer); drop(timer); // This filtering ensures we only import and publish the custody columns. @@ -319,9 +343,9 @@ async fn compute_and_publish_data_columns( }, "compute_and_publish_data_columns", ) + .ok_or(FetchEngineBlobError::RuntimeShutdown)? .await - .map_err(|e| FetchEngineBlobError::BeaconChainError(Box::new(e))) - .and_then(|r| r) + .map_err(FetchEngineBlobError::TokioJoin)? } fn build_blob_sidecars( diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs new file mode 100644 index 0000000000..be3d29e9c9 --- /dev/null +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -0,0 +1,278 @@ +use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter; +use crate::fetch_blobs::{ + fetch_and_process_engine_blobs_inner, BlobsOrDataColumns, FetchEngineBlobError, +}; +use crate::test_utils::{get_kzg, EphemeralHarnessType}; +use crate::AvailabilityProcessingStatus; +use bls::Signature; +use eth2::types::BlobsBundle; +use execution_layer::json_structures::BlobAndProofV2; +use execution_layer::test_utils::generate_blobs; +use maplit::hashset; +use std::sync::{Arc, Mutex}; +use task_executor::test_utils::TestRuntime; +use types::{ + BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, SignedBeaconBlock, + SignedBeaconBlockFulu, +}; + +type E = MainnetEthSpec; +type T = EphemeralHarnessType; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_no_blobs_in_block() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, _s) = mock_publish_fn(); + let block = SignedBeaconBlock::::Fulu(SignedBeaconBlockFulu { + message: BeaconBlockFulu::empty(mock_adapter.spec()), + signature: Signature::empty(), + }); + let block_root = block.canonical_root(); + + // Expectations: engine fetch blobs should not be triggered + mock_adapter.expect_get_blobs_v2().times(0); + mock_adapter.expect_process_engine_blobs().times(0); + + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + Arc::new(block), + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_no_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, _) = mock_publish_fn(); + let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // No blobs in EL response + mock_get_blobs_v2_response(&mut mock_adapter, None); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_partial_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // Missing blob in EL response + blobs_and_proofs.pop(); + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + // No blobs should be processed + mock_adapter.expect_process_engine_blobs().times(0); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no columns should be published" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_block_imported_after_el_response() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // All blobs returned, but fork choice already imported the block + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]); + // No blobs should be processed + mock_adapter.expect_process_engine_blobs().times(0); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no columns should be published" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fetch_blobs_v2_success() { + let mut mock_adapter = mock_beacon_adapter(); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); + let block_root = block.canonical_root(); + + // All blobs returned, fork choice doesn't contain block + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + mock_process_engine_blobs_result( + &mut mock_adapter, + Ok(AvailabilityProcessingStatus::Imported(block_root)), + ); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!( + processing_status, + Some(AvailabilityProcessingStatus::Imported(block_root)) + ); + + let published_columns = extract_published_blobs(publish_fn_args); + assert!( + matches!( + published_columns, + BlobsOrDataColumns::DataColumns (columns) if columns.len() == custody_columns.len() + ), + "should publish custody columns" + ); +} + +/// Extract the `BlobsOrDataColumns` passed to the `publish_fn`. +fn extract_published_blobs( + publish_fn_args: Arc>>>, +) -> BlobsOrDataColumns { + let mut calls = publish_fn_args.lock().unwrap(); + assert_eq!(calls.len(), 1); + calls.pop().unwrap() +} + +fn mock_process_engine_blobs_result( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + result: Result, +) { + mock_adapter + .expect_process_engine_blobs() + .return_once(move |_, _, _| result); +} + +fn mock_fork_choice_contains_block( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + block_roots: Vec, +) { + mock_adapter + .expect_fork_choice_contains_block() + .returning(move |block_root| block_roots.contains(block_root)); +} + +fn mock_get_blobs_v2_response( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + blobs_and_proofs_opt: Option>>, +) { + mock_adapter + .expect_get_blobs_v2() + .return_once(move |_| Ok(blobs_and_proofs_opt)); +} + +fn create_test_block_and_blobs( + mock_adapter: &MockFetchBlobsBeaconAdapter, +) -> (Arc>, Vec>) { + let mut block = SignedBeaconBlock::Fulu(SignedBeaconBlockFulu { + message: BeaconBlockFulu::empty(mock_adapter.spec()), + signature: Signature::empty(), + }); + let (blobs_bundle, _tx) = generate_blobs::(2, block.fork_name_unchecked()).unwrap(); + let BlobsBundle { + commitments, + proofs, + blobs, + } = blobs_bundle; + + *block + .message_mut() + .body_mut() + .blob_kzg_commitments_mut() + .unwrap() = commitments; + + let proofs_len = proofs.len() / blobs.len(); + let blob_and_proofs: Vec> = blobs + .into_iter() + .zip(proofs.chunks(proofs_len)) + .map(|(blob, proofs)| BlobAndProofV2 { + blob, + proofs: proofs.to_vec().into(), + }) + .collect(); + (Arc::new(block), blob_and_proofs) +} + +#[allow(clippy::type_complexity)] +fn mock_publish_fn() -> ( + impl Fn(BlobsOrDataColumns) + Send + 'static, + Arc>>>, +) { + // Keep track of the arguments captured by `publish_fn`. + let captured_args = Arc::new(Mutex::new(vec![])); + let captured_args_clone = captured_args.clone(); + let publish_fn = move |args| { + let mut lock = captured_args_clone.lock().unwrap(); + lock.push(args); + }; + (publish_fn, captured_args) +} + +fn mock_beacon_adapter() -> MockFetchBlobsBeaconAdapter { + let test_runtime = TestRuntime::default(); + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let kzg = get_kzg(&spec); + + let mut mock_adapter = MockFetchBlobsBeaconAdapter::default(); + mock_adapter.expect_spec().return_const(spec.clone()); + mock_adapter.expect_kzg().return_const(kzg.clone()); + mock_adapter + .expect_executor() + .return_const(test_runtime.task_executor.clone()); + mock_adapter +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 858aaafcf0..1c388c06d4 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -69,8 +69,6 @@ use types::{typenum::U4294967296, *}; pub const HARNESS_GENESIS_TIME: u64 = 1_567_552_690; // Environment variable to read if `fork_from_env` feature is enabled. pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME"; -// Environment variable to read if `ci_logger` feature is enabled. -pub const CI_LOGGER_DIR_ENV_VAR: &str = "CI_LOGGER_DIR"; // Pre-computed data column sidecar using a single static blob from: // `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz` @@ -2674,10 +2672,7 @@ where mut latest_block_hash: Option, sync_committee_strategy: SyncCommitteeStrategy, ) -> AddBlocksResult { - assert!( - slots.windows(2).all(|w| w[0] <= w[1]), - "Slots have to be sorted" - ); // slice.is_sorted() isn't stabilized at the moment of writing this + assert!(slots.is_sorted(), "Slots have to be in ascending order"); let mut block_hash_from_slot: HashMap = HashMap::new(); let mut state_hash_from_slot: HashMap = HashMap::new(); for slot in slots { @@ -2717,10 +2712,7 @@ where mut latest_block_hash: Option, sync_committee_strategy: SyncCommitteeStrategy, ) -> AddBlocksResult { - assert!( - slots.windows(2).all(|w| w[0] <= w[1]), - "Slots have to be sorted" - ); // slice.is_sorted() isn't stabilized at the moment of writing this + assert!(slots.is_sorted(), "Slots have to be in ascending order"); let mut block_hash_from_slot: HashMap = HashMap::new(); let mut state_hash_from_slot: HashMap = HashMap::new(); for slot in slots { diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 195c53c4a0..379b46b4b1 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -4,12 +4,6 @@ version = "0.2.0" authors = ["Sigma Prime "] edition = { workspace = true } -[dev-dependencies] -operation_pool = { workspace = true } -serde_yaml = { workspace = true } -state_processing = { workspace = true } -tokio = { workspace = true } - [dependencies] beacon_chain = { workspace = true } beacon_processor = { workspace = true } @@ -46,3 +40,9 @@ tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } types = { workspace = true } + +[dev-dependencies] +operation_pool = { workspace = true } +serde_yaml = { workspace = true } +state_processing = { workspace = true } +tokio = { workspace = true } diff --git a/beacon_node/eth1/Cargo.toml b/beacon_node/eth1/Cargo.toml index fa08364251..f834ad7eef 100644 --- a/beacon_node/eth1/Cargo.toml +++ b/beacon_node/eth1/Cargo.toml @@ -4,11 +4,6 @@ version = "0.2.0" authors = ["Paul Hauner "] edition = { workspace = true } -[dev-dependencies] -environment = { workspace = true } -eth1_test_rig = { workspace = true } -serde_yaml = { workspace = true } - [dependencies] eth2 = { workspace = true } ethereum_ssz = { workspace = true } @@ -28,3 +23,8 @@ tokio = { workspace = true } tracing = { workspace = true } tree_hash = { workspace = true } types = { workspace = true } + +[dev-dependencies] +environment = { workspace = true } +eth1_test_rig = { workspace = true } +serde_yaml = { workspace = true } diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index bf4c391a8d..300713fdca 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -727,7 +727,7 @@ impl HttpJsonRpc { pub async fn get_blobs_v2( &self, versioned_hashes: Vec, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let params = json!([versioned_hashes]); self.rpc_request( @@ -1242,6 +1242,10 @@ impl HttpJsonRpc { } else { let engine_version = self.get_client_version_v1().await?; *lock = Some(CachedResponse::new(engine_version.clone())); + if !engine_version.is_empty() { + // reset metric gauge when there's a fresh fetch + crate::metrics::reset_execution_layer_info_gauge(); + } Ok(engine_version) } } diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index ddd8bb5008..cf751138d6 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -129,8 +129,7 @@ impl TryFrom> for ProvenancedPayload ExecutionLayer { &self, age_limit: Option, ) -> Result, Error> { - self.engine() + let versions = self + .engine() .request(|engine| engine.get_engine_version(age_limit)) .await - .map_err(Into::into) + .map_err(Into::::into)?; + metrics::expose_execution_layer_info(&versions); + + Ok(versions) } /// Used during block production to determine if the merge has been triggered. @@ -1861,7 +1864,7 @@ impl ExecutionLayer { pub async fn get_blobs_v2( &self, query: Vec, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let capabilities = self.get_engine_capabilities(None).await?; if capabilities.get_blobs_v2 { diff --git a/beacon_node/execution_layer/src/metrics.rs b/beacon_node/execution_layer/src/metrics.rs index ab1a22677f..aba8434c8e 100644 --- a/beacon_node/execution_layer/src/metrics.rs +++ b/beacon_node/execution_layer/src/metrics.rs @@ -116,3 +116,29 @@ pub static EXECUTION_LAYER_PAYLOAD_BIDS: LazyLock> = LazyLoc &["source"] ) }); +pub static EXECUTION_LAYER_INFO: LazyLock> = LazyLock::new(|| { + try_create_int_gauge_vec( + "execution_layer_info", + "The build of the execution layer connected to lighthouse", + &["code", "name", "version", "commit"], + ) +}); + +pub fn reset_execution_layer_info_gauge() { + let _ = EXECUTION_LAYER_INFO.as_ref().map(|gauge| gauge.reset()); +} + +pub fn expose_execution_layer_info(els: &Vec) { + for el in els { + set_gauge_vec( + &EXECUTION_LAYER_INFO, + &[ + &el.code.to_string(), + &el.name, + &el.version, + &el.commit.to_string(), + ], + 1, + ); + } +} diff --git a/beacon_node/genesis/Cargo.toml b/beacon_node/genesis/Cargo.toml index 6ba8998a01..f752b888a7 100644 --- a/beacon_node/genesis/Cargo.toml +++ b/beacon_node/genesis/Cargo.toml @@ -4,11 +4,6 @@ version = "0.2.0" authors = ["Paul Hauner "] edition = { workspace = true } -[dev-dependencies] -eth1_test_rig = { workspace = true } -logging = { workspace = true } -sensitive_url = { workspace = true } - [dependencies] environment = { workspace = true } eth1 = { workspace = true } @@ -23,3 +18,8 @@ tokio = { workspace = true } tracing = { workspace = true } tree_hash = { workspace = true } types = { workspace = true } + +[dev-dependencies] +eth1_test_rig = { workspace = true } +logging = { workspace = true } +sensitive_url = { workspace = true } diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 4f1825af20..3ee967eeee 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -4,6 +4,9 @@ version = "0.2.0" authors = ["Sigma Prime "] edition = { workspace = true } +[features] +libp2p-websocket = [] + [dependencies] alloy-primitives = { workspace = true } alloy-rlp = { workspace = true } @@ -53,7 +56,21 @@ unused_port = { workspace = true } [dependencies.libp2p] version = "0.55" default-features = false -features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic", "upnp"] +features = [ + "identify", + "yamux", + "noise", + "dns", + "tcp", + "tokio", + "plaintext", + "secp256k1", + "macros", + "ecdsa", + "metrics", + "quic", + "upnp", +] [dev-dependencies] async-channel = { workspace = true } @@ -61,6 +78,3 @@ logging = { workspace = true } quickcheck = { workspace = true } quickcheck_macros = { workspace = true } tempfile = { workspace = true } - -[features] -libp2p-websocket = [] diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 9b43e8b581..72d7aa0074 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::runtime::Runtime; use tokio::time::sleep; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info_span, warn, Instrument}; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec, EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MinimalEthSpec, @@ -55,7 +55,7 @@ fn bellatrix_block_large(spec: &ChainSpec) -> BeaconBlock { fn test_tcp_status_rpc() { // Set up the logging. let log_level = "debug"; - let enable_logging = false; + let enable_logging = true; build_tracing_subscriber(log_level, enable_logging); let rt = Arc::new(Runtime::new().unwrap()); @@ -117,7 +117,8 @@ fn test_tcp_status_rpc() { _ => {} } } - }; + } + .instrument(info_span!("Sender")); // build the receiver future let receiver_future = async { @@ -141,7 +142,8 @@ fn test_tcp_status_rpc() { _ => {} // Ignore other events } } - }; + } + .instrument(info_span!("Receiver")); tokio::select! { _ = sender_future => {} @@ -159,7 +161,7 @@ fn test_tcp_status_rpc() { fn test_tcp_blocks_by_range_chunked_rpc() { // Set up the logging. let log_level = "debug"; - let enable_logging = false; + let enable_logging = true; build_tracing_subscriber(log_level, enable_logging); let messages_to_send = 6; @@ -245,7 +247,8 @@ fn test_tcp_blocks_by_range_chunked_rpc() { _ => {} // Ignore other behaviour events } } - }; + } + .instrument(info_span!("Sender")); // build the receiver future let receiver_future = async { @@ -286,7 +289,8 @@ fn test_tcp_blocks_by_range_chunked_rpc() { _ => {} // Ignore other events } } - }; + } + .instrument(info_span!("Receiver")); tokio::select! { _ = sender_future => {} @@ -304,7 +308,7 @@ fn test_tcp_blocks_by_range_chunked_rpc() { fn test_blobs_by_range_chunked_rpc() { // Set up the logging. let log_level = "debug"; - let enable_logging = false; + let enable_logging = true; build_tracing_subscriber(log_level, enable_logging); let slot_count = 32; @@ -373,7 +377,8 @@ fn test_blobs_by_range_chunked_rpc() { _ => {} // Ignore other behaviour events } } - }; + } + .instrument(info_span!("Sender")); // build the receiver future let receiver_future = async { @@ -407,7 +412,8 @@ fn test_blobs_by_range_chunked_rpc() { _ => {} // Ignore other events } } - }; + } + .instrument(info_span!("Receiver")); tokio::select! { _ = sender_future => {} @@ -425,7 +431,7 @@ fn test_blobs_by_range_chunked_rpc() { fn test_tcp_blocks_by_range_over_limit() { // Set up the logging. let log_level = "debug"; - let enable_logging = false; + let enable_logging = true; build_tracing_subscriber(log_level, enable_logging); let messages_to_send = 5; @@ -479,7 +485,8 @@ fn test_tcp_blocks_by_range_over_limit() { _ => {} // Ignore other behaviour events } } - }; + } + .instrument(info_span!("Sender")); // build the receiver future let receiver_future = async { @@ -512,7 +519,8 @@ fn test_tcp_blocks_by_range_over_limit() { _ => {} // Ignore other events } } - }; + } + .instrument(info_span!("Receiver")); tokio::select! { _ = sender_future => {} @@ -529,7 +537,7 @@ fn test_tcp_blocks_by_range_over_limit() { fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { // Set up the logging. let log_level = "debug"; - let enable_logging = false; + let enable_logging = true; build_tracing_subscriber(log_level, enable_logging); let messages_to_send = 10; @@ -601,7 +609,8 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { _ => {} // Ignore other behaviour events } } - }; + } + .instrument(info_span!("Sender")); // determine messages to send (PeerId, RequestId). If some, indicates we still need to send // messages @@ -648,7 +657,8 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { } } } - }; + } + .instrument(info_span!("Receiver")); tokio::select! { _ = sender_future => {} @@ -666,7 +676,7 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { fn test_tcp_blocks_by_range_single_empty_rpc() { // Set up the logging. let log_level = "trace"; - let enable_logging = false; + let enable_logging = true; build_tracing_subscriber(log_level, enable_logging); let rt = Arc::new(Runtime::new().unwrap()); @@ -734,7 +744,8 @@ fn test_tcp_blocks_by_range_single_empty_rpc() { _ => {} // Ignore other behaviour events } } - }; + } + .instrument(info_span!("Sender")); // build the receiver future let receiver_future = async { @@ -767,7 +778,8 @@ fn test_tcp_blocks_by_range_single_empty_rpc() { _ => {} // Ignore other events } } - }; + } + .instrument(info_span!("Receiver")); tokio::select! { _ = sender_future => {} _ = receiver_future => {} @@ -787,7 +799,7 @@ fn test_tcp_blocks_by_range_single_empty_rpc() { fn test_tcp_blocks_by_root_chunked_rpc() { // Set up the logging. let log_level = "debug"; - let enable_logging = false; + let enable_logging = true; build_tracing_subscriber(log_level, enable_logging); let messages_to_send = 6; @@ -877,7 +889,8 @@ fn test_tcp_blocks_by_root_chunked_rpc() { _ => {} // Ignore other behaviour events } } - }; + } + .instrument(info_span!("Sender")); // build the receiver future let receiver_future = async { @@ -916,7 +929,8 @@ fn test_tcp_blocks_by_root_chunked_rpc() { _ => {} // Ignore other events } } - }; + } + .instrument(info_span!("Receiver")); tokio::select! { _ = sender_future => {} _ = receiver_future => {} @@ -932,7 +946,7 @@ fn test_tcp_blocks_by_root_chunked_rpc() { fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { // Set up the logging. let log_level = "debug"; - let enable_logging = false; + let enable_logging = true; build_tracing_subscriber(log_level, enable_logging); let messages_to_send: u64 = 10; @@ -1015,7 +1029,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { _ => {} // Ignore other behaviour events } } - }; + } + .instrument(info_span!("Sender")); // determine messages to send (PeerId, RequestId). If some, indicates we still need to send // messages @@ -1062,7 +1077,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { } } } - }; + } + .instrument(info_span!("Receiver")); tokio::select! { _ = sender_future => {} @@ -1115,7 +1131,8 @@ fn goodbye_test(log_level: &str, enable_logging: bool, protocol: Protocol) { _ => {} // Ignore other RPC messages } } - }; + } + .instrument(info_span!("Sender")); // build the receiver future let receiver_future = async { @@ -1125,7 +1142,8 @@ fn goodbye_test(log_level: &str, enable_logging: bool, protocol: Protocol) { return; } } - }; + } + .instrument(info_span!("Receiver")); let total_future = futures::future::join(sender_future, receiver_future); @@ -1143,7 +1161,7 @@ fn goodbye_test(log_level: &str, enable_logging: bool, protocol: Protocol) { #[allow(clippy::single_match)] fn tcp_test_goodbye_rpc() { let log_level = "debug"; - let enabled_logging = false; + let enabled_logging = true; goodbye_test(log_level, enabled_logging, Protocol::Tcp); } @@ -1152,13 +1170,15 @@ fn tcp_test_goodbye_rpc() { #[allow(clippy::single_match)] fn quic_test_goodbye_rpc() { let log_level = "debug"; - let enabled_logging = false; + let enabled_logging = true; goodbye_test(log_level, enabled_logging, Protocol::Quic); } // Test that the receiver delays the responses during response rate-limiting. #[test] fn test_delayed_rpc_response() { + // Set up the logging. + build_tracing_subscriber("debug", true); let rt = Arc::new(Runtime::new().unwrap()); let spec = Arc::new(E::default_spec()); @@ -1214,7 +1234,7 @@ fn test_delayed_rpc_response() { app_request_id: _, response, } => { - debug!(%request_id, "Sender received"); + debug!(%request_id, elapsed = ?request_sent_at.elapsed(), "Sender received response"); assert_eq!(response, rpc_response); match request_id { @@ -1289,6 +1309,8 @@ fn test_delayed_rpc_response() { // once, thanks to the self-limiter on the sender side. #[test] fn test_active_requests() { + // Set up the logging. + build_tracing_subscriber("debug", true); let rt = Arc::new(Runtime::new().unwrap()); let spec = Arc::new(E::default_spec()); diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 4e36953880..cdb6ba7a83 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -4,17 +4,12 @@ version = "0.2.0" authors = ["Sigma Prime "] edition = { workspace = true } -[dev-dependencies] -bls = { workspace = true } -eth2 = { workspace = true } -eth2_network_config = { workspace = true } -genesis = { workspace = true } -gossipsub = { workspace = true } -k256 = "0.13.4" -kzg = { workspace = true } -matches = "0.1.8" -rand_chacha = "0.3.1" -serde_json = { workspace = true } +[features] +# NOTE: This can be run via cargo build --bin lighthouse --features network/disable-backfill +disable-backfill = [] +fork_from_env = ["beacon_chain/fork_from_env"] +portable = ["beacon_chain/portable"] +test_logger = [] [dependencies] alloy-primitives = { workspace = true } @@ -51,10 +46,14 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } types = { workspace = true } -[features] -# NOTE: This can be run via cargo build --bin lighthouse --features network/disable-backfill -disable-backfill = [] -fork_from_env = ["beacon_chain/fork_from_env"] -portable = ["beacon_chain/portable"] -test_logger = [] -ci_logger = [] +[dev-dependencies] +bls = { workspace = true } +eth2 = { workspace = true } +eth2_network_config = { workspace = true } +genesis = { workspace = true } +gossipsub = { workspace = true } +k256 = "0.13.4" +kzg = { workspace = true } +matches = "0.1.8" +rand_chacha = "0.3.1" +serde_json = { workspace = true } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 5863091cf0..38095ec434 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -107,6 +107,8 @@ impl TestRig { // deterministic seed let rng = ChaCha20Rng::from_seed([0u8; 32]); + init_tracing(); + TestRig { beacon_processor_rx, beacon_processor_rx_queue: vec![], diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index ec24ddb036..3dca457108 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -9,9 +9,14 @@ use beacon_processor::WorkEvent; use lighthouse_network::NetworkGlobals; use rand_chacha::ChaCha20Rng; use slot_clock::ManualSlotClock; -use std::sync::Arc; +use std::fs::OpenOptions; +use std::io::Write; +use std::sync::{Arc, Once}; use store::MemoryStore; use tokio::sync::mpsc; +use tracing_subscriber::fmt::MakeWriter; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; use types::{ChainSpec, ForkName, MinimalEthSpec as E}; mod lookups; @@ -65,3 +70,55 @@ struct TestRig { fork_name: ForkName, spec: Arc, } + +// Environment variable to read if `fork_from_env` feature is enabled. +pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME"; +// Environment variable specifying the log output directory in CI. +pub const CI_LOGGER_DIR_ENV_VAR: &str = "CI_LOGGER_DIR"; + +static INIT_TRACING: Once = Once::new(); + +pub fn init_tracing() { + INIT_TRACING.call_once(|| { + if std::env::var(CI_LOGGER_DIR_ENV_VAR).is_ok() { + // Enable logging to log files for each test and each fork. + tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_writer(CILogWriter), + ) + .init(); + } + }); +} + +// CILogWriter writes logs to separate files for each test and each fork. +struct CILogWriter; + +impl<'a> MakeWriter<'a> for CILogWriter { + type Writer = Box; + + // fmt::Layer calls this method each time an event is recorded. + fn make_writer(&'a self) -> Self::Writer { + let log_dir = std::env::var(CI_LOGGER_DIR_ENV_VAR).unwrap(); + let fork_name = std::env::var(FORK_NAME_ENV_VAR) + .map(|s| format!("{s}_")) + .unwrap_or_default(); + + // The current test name can be got via the thread name. + let test_name = std::thread::current() + .name() + .unwrap_or("unnamed") + .replace(|c: char| !c.is_alphanumeric(), "_"); + + let file_path = format!("{log_dir}/{fork_name}{test_name}.log"); + let file = OpenOptions::new() + .append(true) + .create(true) + .open(&file_path) + .expect("failed to open a log file"); + + Box::new(file) + } +} diff --git a/beacon_node/operation_pool/Cargo.toml b/beacon_node/operation_pool/Cargo.toml index 570b74226c..beaf818882 100644 --- a/beacon_node/operation_pool/Cargo.toml +++ b/beacon_node/operation_pool/Cargo.toml @@ -4,6 +4,9 @@ version = "0.2.0" authors = ["Michael Sproul "] edition = { workspace = true } +[features] +portable = ["beacon_chain/portable"] + [dependencies] bitvec = { workspace = true } derivative = { workspace = true } @@ -23,6 +26,3 @@ types = { workspace = true } beacon_chain = { workspace = true } maplit = { workspace = true } tokio = { workspace = true } - -[features] -portable = ["beacon_chain/portable"] diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 908f0759a9..13df83efab 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -9,12 +9,6 @@ default = ["leveldb"] leveldb = ["dep:leveldb"] redb = ["dep:redb"] -[dev-dependencies] -beacon_chain = { workspace = true } -criterion = { workspace = true } -rand = { workspace = true, features = ["small_rng"] } -tempfile = { workspace = true } - [dependencies] bls = { workspace = true } db-key = "0.0.5" @@ -40,6 +34,12 @@ types = { workspace = true } xdelta3 = { workspace = true } zstd = { workspace = true } +[dev-dependencies] +beacon_chain = { workspace = true } +criterion = { workspace = true } +rand = { workspace = true, features = ["small_rng"] } +tempfile = { workspace = true } + [[bench]] name = "hdiff" harness = false diff --git a/common/compare_fields/Cargo.toml b/common/compare_fields/Cargo.toml index 9972ca75ca..50e7e5f21d 100644 --- a/common/compare_fields/Cargo.toml +++ b/common/compare_fields/Cargo.toml @@ -4,11 +4,11 @@ version = "0.2.0" authors = ["Paul Hauner "] edition = { workspace = true } +[package.metadata.cargo-udeps.ignore] +development = ["compare_fields_derive"] # used in doc-tests + [dependencies] itertools = { workspace = true } [dev-dependencies] compare_fields_derive = { workspace = true } - -[package.metadata.cargo-udeps.ignore] -development = ["compare_fields_derive"] # used in doc-tests diff --git a/common/deposit_contract/Cargo.toml b/common/deposit_contract/Cargo.toml index 953fde1af7..767f67b853 100644 --- a/common/deposit_contract/Cargo.toml +++ b/common/deposit_contract/Cargo.toml @@ -6,14 +6,14 @@ edition = { workspace = true } build = "build.rs" -[build-dependencies] -hex = { workspace = true } -reqwest = { workspace = true } -serde_json = { workspace = true } -sha2 = { workspace = true } - [dependencies] ethabi = "16.0.0" ethereum_ssz = { workspace = true } tree_hash = { workspace = true } types = { workspace = true } + +[build-dependencies] +hex = { workspace = true } +reqwest = { workspace = true } +serde_json = { workspace = true } +sha2 = { workspace = true } diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index 5d0ad1f45e..81666a6421 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -4,6 +4,10 @@ version = "0.1.0" authors = ["Paul Hauner "] edition = { workspace = true } +[features] +default = ["lighthouse"] +lighthouse = [] + [dependencies] derivative = { workspace = true } either = { workspace = true } @@ -33,7 +37,3 @@ zeroize = { workspace = true } [dev-dependencies] tokio = { workspace = true } - -[features] -default = ["lighthouse"] -lighthouse = [] diff --git a/common/eth2_network_config/Cargo.toml b/common/eth2_network_config/Cargo.toml index da6c4dfd95..ec5b0cc1d7 100644 --- a/common/eth2_network_config/Cargo.toml +++ b/common/eth2_network_config/Cargo.toml @@ -6,15 +6,6 @@ edition = { workspace = true } build = "build.rs" -[build-dependencies] -eth2_config = { workspace = true } -zip = { workspace = true } - -[dev-dependencies] -ethereum_ssz = { workspace = true } -tempfile = { workspace = true } -tokio = { workspace = true } - [dependencies] bytes = { workspace = true } discv5 = { workspace = true } @@ -28,3 +19,12 @@ sha2 = { workspace = true } tracing = { workspace = true } types = { workspace = true } url = { workspace = true } + +[build-dependencies] +eth2_config = { workspace = true } +zip = { workspace = true } + +[dev-dependencies] +ethereum_ssz = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true } diff --git a/common/health_metrics/Cargo.toml b/common/health_metrics/Cargo.toml index 08591471b2..20a8c6e4e4 100644 --- a/common/health_metrics/Cargo.toml +++ b/common/health_metrics/Cargo.toml @@ -8,5 +8,5 @@ eth2 = { workspace = true } metrics = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] -psutil = "3.3.0" procfs = "0.15.1" +psutil = "3.3.0" diff --git a/common/logging/src/tracing_libp2p_discv5_logging_layer.rs b/common/logging/src/tracing_libp2p_discv5_logging_layer.rs index 90033d11ad..ef472ddc52 100644 --- a/common/logging/src/tracing_libp2p_discv5_logging_layer.rs +++ b/common/logging/src/tracing_libp2p_discv5_logging_layer.rs @@ -59,6 +59,7 @@ impl tracing_core::field::Visit for LogMessageExtractor { pub fn create_libp2p_discv5_tracing_layer( base_tracing_log_path: Option, max_log_size: u64, + file_mode: u32, ) -> Option { if let Some(mut tracing_log_path) = base_tracing_log_path { // Ensure that `tracing_log_path` only contains directories. @@ -75,12 +76,14 @@ pub fn create_libp2p_discv5_tracing_layer( let libp2p_writer = LogRollerBuilder::new(tracing_log_path.clone(), PathBuf::from("libp2p.log")) .rotation(Rotation::SizeBased(RotationSize::MB(max_log_size))) - .max_keep_files(1); + .max_keep_files(1) + .file_mode(file_mode); let discv5_writer = LogRollerBuilder::new(tracing_log_path.clone(), PathBuf::from("discv5.log")) .rotation(Rotation::SizeBased(RotationSize::MB(max_log_size))) - .max_keep_files(1); + .max_keep_files(1) + .file_mode(file_mode); let libp2p_writer = match libp2p_writer.build() { Ok(writer) => writer, diff --git a/common/malloc_utils/Cargo.toml b/common/malloc_utils/Cargo.toml index 64fb7b9aad..89973493b4 100644 --- a/common/malloc_utils/Cargo.toml +++ b/common/malloc_utils/Cargo.toml @@ -4,23 +4,23 @@ version = "0.1.0" authors = ["Paul Hauner "] edition = { workspace = true } +[features] +mallinfo2 = [] +jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] +jemalloc-profiling = ["tikv-jemallocator/profiling"] + [dependencies] libc = "0.2.79" metrics = { workspace = true } parking_lot = { workspace = true } tikv-jemalloc-ctl = { version = "0.6.0", optional = true, features = ["stats"] } +[target.'cfg(not(target_os = "linux"))'.dependencies] +tikv-jemallocator = { version = "0.6.0", optional = true, features = ["stats"] } + # Jemalloc's background_threads feature requires Linux (pthreads). [target.'cfg(target_os = "linux")'.dependencies] tikv-jemallocator = { version = "0.6.0", optional = true, features = [ "stats", "background_threads", ] } - -[target.'cfg(not(target_os = "linux"))'.dependencies] -tikv-jemallocator = { version = "0.6.0", optional = true, features = ["stats"] } - -[features] -mallinfo2 = [] -jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] -jemalloc-profiling = ["tikv-jemallocator/profiling"] diff --git a/consensus/merkle_proof/Cargo.toml b/consensus/merkle_proof/Cargo.toml index 2f721d917b..d750c05406 100644 --- a/consensus/merkle_proof/Cargo.toml +++ b/consensus/merkle_proof/Cargo.toml @@ -4,6 +4,9 @@ version = "0.2.0" authors = ["Michael Sproul "] edition = { workspace = true } +[features] +arbitrary = ["alloy-primitives/arbitrary"] + [dependencies] alloy-primitives = { workspace = true } ethereum_hashing = { workspace = true } @@ -13,6 +16,3 @@ safe_arith = { workspace = true } [dev-dependencies] quickcheck = { workspace = true } quickcheck_macros = { workspace = true } - -[features] -arbitrary = ["alloy-primitives/arbitrary"] diff --git a/consensus/state_processing/Cargo.toml b/consensus/state_processing/Cargo.toml index 502ffe3cf6..7ada4488f2 100644 --- a/consensus/state_processing/Cargo.toml +++ b/consensus/state_processing/Cargo.toml @@ -4,10 +4,18 @@ version = "0.2.0" authors = ["Paul Hauner ", "Michael Sproul "] edition = { workspace = true } -[dev-dependencies] -beacon_chain = { workspace = true } -env_logger = { workspace = true } -tokio = { workspace = true } +[features] +default = ["legacy-arith"] +fake_crypto = ["bls/fake_crypto"] +legacy-arith = ["types/legacy-arith"] +arbitrary-fuzz = [ + "types/arbitrary-fuzz", + "merkle_proof/arbitrary", + "ethereum_ssz/arbitrary", + "ssz_types/arbitrary", + "tree_hash/arbitrary", +] +portable = ["bls/supranational-portable"] [dependencies] arbitrary = { workspace = true } @@ -30,15 +38,7 @@ test_random_derive = { path = "../../common/test_random_derive" } tree_hash = { workspace = true } types = { workspace = true } -[features] -default = ["legacy-arith"] -fake_crypto = ["bls/fake_crypto"] -legacy-arith = ["types/legacy-arith"] -arbitrary-fuzz = [ - "types/arbitrary-fuzz", - "merkle_proof/arbitrary", - "ethereum_ssz/arbitrary", - "ssz_types/arbitrary", - "tree_hash/arbitrary", -] -portable = ["bls/supranational-portable"] +[dev-dependencies] +beacon_chain = { workspace = true } +env_logger = { workspace = true } +tokio = { workspace = true } diff --git a/consensus/swap_or_not_shuffle/Cargo.toml b/consensus/swap_or_not_shuffle/Cargo.toml index dac83e7553..b6fdc1a728 100644 --- a/consensus/swap_or_not_shuffle/Cargo.toml +++ b/consensus/swap_or_not_shuffle/Cargo.toml @@ -4,17 +4,17 @@ version = "0.2.0" authors = ["Paul Hauner "] edition = { workspace = true } -[[bench]] -name = "benches" -harness = false - -[dev-dependencies] -criterion = { workspace = true } +[features] +arbitrary = ["alloy-primitives/arbitrary"] [dependencies] alloy-primitives = { workspace = true } ethereum_hashing = { workspace = true } fixed_bytes = { workspace = true } -[features] -arbitrary = ["alloy-primitives/arbitrary"] +[dev-dependencies] +criterion = { workspace = true } + +[[bench]] +name = "benches" +harness = false diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index b58d4ef96f..ec6835defc 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -4,9 +4,15 @@ version = "0.2.1" authors = ["Paul Hauner ", "Age Manning "] edition = { workspace = true } -[[bench]] -name = "benches" -harness = false +[features] +default = ["sqlite", "legacy-arith"] +# Allow saturating arithmetic on slots and epochs. Enabled by default, but deprecated. +legacy-arith = [] +sqlite = ["dep:rusqlite"] +# The `arbitrary-fuzz` feature is a no-op provided for backwards compatibility. +# For simplicity `Arbitrary` is now derived regardless of the feature's presence. +arbitrary-fuzz = [] +portable = ["bls/supranational-portable"] [dependencies] alloy-primitives = { workspace = true } @@ -62,12 +68,6 @@ paste = { workspace = true } state_processing = { workspace = true } tokio = { workspace = true } -[features] -default = ["sqlite", "legacy-arith"] -# Allow saturating arithmetic on slots and epochs. Enabled by default, but deprecated. -legacy-arith = [] -sqlite = ["dep:rusqlite"] -# The `arbitrary-fuzz` feature is a no-op provided for backwards compatibility. -# For simplicity `Arbitrary` is now derived regardless of the feature's presence. -arbitrary-fuzz = [] -portable = ["bls/supranational-portable"] +[[bench]] +name = "benches" +harness = false diff --git a/crypto/bls/Cargo.toml b/crypto/bls/Cargo.toml index d02e01b80c..4661288679 100644 --- a/crypto/bls/Cargo.toml +++ b/crypto/bls/Cargo.toml @@ -4,6 +4,14 @@ version = "0.2.0" authors = ["Paul Hauner "] edition = { workspace = true } +[features] +arbitrary = [] +default = ["supranational"] +fake_crypto = [] +supranational = ["blst"] +supranational-portable = ["supranational", "blst/portable"] +supranational-force-adx = ["supranational", "blst/force-adx"] + [dependencies] alloy-primitives = { workspace = true } arbitrary = { workspace = true } @@ -18,11 +26,3 @@ safe_arith = { workspace = true } serde = { workspace = true } tree_hash = { workspace = true } zeroize = { workspace = true } - -[features] -arbitrary = [] -default = ["supranational"] -fake_crypto = [] -supranational = ["blst"] -supranational-portable = ["supranational", "blst/portable"] -supranational-force-adx = ["supranational", "blst/force-adx"] diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 9acbe2569c..b39feb5011 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -5,6 +5,9 @@ version = "7.1.0-beta.0" authors = ["Paul Hauner "] edition = { workspace = true } +[package.metadata.cargo-udeps.ignore] +normal = ["malloc_utils"] + [features] portable = ["bls/supranational-portable"] fake_crypto = ['bls/fake_crypto'] @@ -42,6 +45,3 @@ tracing-subscriber = { workspace = true } tree_hash = { workspace = true } types = { workspace = true } validator_dir = { workspace = true } - -[package.metadata.cargo-udeps.ignore] -normal = ["malloc_utils"] diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 05f4900c46..105100aeb1 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -675,6 +675,7 @@ fn run(env_builder: EnvironmentBuilder, matches: &ArgMatches) -> extra_info: false, }, "", + 0o600, ); let env = env_builder diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 04c8efcdba..cc17f638fd 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -6,6 +6,11 @@ edition = { workspace = true } autotests = false rust-version = "1.83.0" +# Prevent cargo-udeps from flagging the dummy package `target_check`, which exists only +# to assert properties of the compilation target. +[package.metadata.cargo-udeps.ignore] +normal = ["target_check"] + [features] default = ["slasher-lmdb", "beacon-node-leveldb"] # Writes debugging .ssz files to /tmp during block processing. @@ -32,12 +37,6 @@ beacon-node-redb = ["store/redb"] # Deprecated. This is now enabled by default on non windows targets. jemalloc = [] -[target.'cfg(not(target_os = "windows"))'.dependencies] -malloc_utils = { workspace = true, features = ["jemalloc"] } - -[target.'cfg(target_os = "windows")'.dependencies] -malloc_utils = { workspace = true } - [dependencies] account_manager = { "path" = "../account_manager" } account_utils = { workspace = true } @@ -69,6 +68,12 @@ unused_port = { workspace = true } validator_client = { workspace = true } validator_manager = { path = "../validator_manager" } +[target.'cfg(not(target_os = "windows"))'.dependencies] +malloc_utils = { workspace = true, features = ["jemalloc"] } + +[target.'cfg(target_os = "windows")'.dependencies] +malloc_utils = { workspace = true } + [dev-dependencies] beacon_node_fallback = { workspace = true } beacon_processor = { workspace = true } @@ -85,8 +90,3 @@ zeroize = { workspace = true } [[test]] name = "lighthouse_tests" path = "tests/main.rs" - -# Prevent cargo-udeps from flagging the dummy package `target_check`, which exists only -# to assert properties of the compilation target. -[package.metadata.cargo-udeps.ignore] -normal = ["target_check"] diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 9b0284e06d..a66b7e128f 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -26,14 +26,7 @@ use types::{EthSpec, GnosisEthSpec, MainnetEthSpec, MinimalEthSpec}; #[cfg(target_family = "unix")] use { futures::Future, - std::{ - fs::{read_dir, set_permissions, Permissions}, - os::unix::fs::PermissionsExt, - path::Path, - pin::Pin, - task::Context, - task::Poll, - }, + std::{pin::Pin, task::Context, task::Poll}, tokio::signal::unix::{signal, Signal, SignalKind}, }; @@ -208,6 +201,7 @@ impl EnvironmentBuilder { mut self, config: LoggerConfig, logfile_prefix: &str, + file_mode: u32, ) -> ( Self, LoggingLayer, @@ -220,9 +214,6 @@ impl EnvironmentBuilder { _ => logfile_prefix, }; - #[cfg(target_family = "unix")] - let file_mode = if config.is_restricted { 0o600 } else { 0o644 }; - let file_logging_layer = match config.path { None => { eprintln!("No logfile path provided, logging to file is disabled"); @@ -239,7 +230,8 @@ impl EnvironmentBuilder { .max_keep_files(config.max_log_number.try_into().unwrap_or_else(|e| { eprintln!("Failed to convert max_log_number to u64: {}", e); 10 - })); + })) + .file_mode(file_mode); if config.compression { appender = appender.compression(Compression::Gzip); @@ -247,9 +239,6 @@ impl EnvironmentBuilder { match appender.build() { Ok(file_appender) => { - #[cfg(target_family = "unix")] - set_logfile_permissions(&path, filename_prefix, file_mode); - let (writer, guard) = tracing_appender::non_blocking(file_appender); Some(LoggingLayer::new( writer, @@ -543,37 +532,3 @@ impl Future for SignalFuture { } } } - -#[cfg(target_family = "unix")] -fn set_logfile_permissions(log_dir: &Path, filename_prefix: &str, file_mode: u32) { - let newest = read_dir(log_dir) - .ok() - .into_iter() - .flat_map(|entries| entries.filter_map(Result::ok)) - .filter_map(|entry| { - let path = entry.path(); - let fname = path.file_name()?.to_string_lossy(); - if path.is_file() && fname.starts_with(filename_prefix) && fname.ends_with(".log") { - let modified = entry.metadata().ok()?.modified().ok()?; - Some((path, modified)) - } else { - None - } - }) - .max_by_key(|(_path, mtime)| *mtime); - - match newest { - Some((file, _mtime)) => { - if let Err(e) = set_permissions(&file, Permissions::from_mode(file_mode)) { - eprintln!("Failed to set permissions on {}: {}", file.display(), e); - } - } - None => { - eprintln!( - "Couldn't find a newly created logfile in {} matching prefix \"{}\".", - log_dir.display(), - filename_prefix - ); - } - } -} diff --git a/lighthouse/environment/src/tracing_common.rs b/lighthouse/environment/src/tracing_common.rs index b1e5078af1..d78eb0d85a 100644 --- a/lighthouse/environment/src/tracing_common.rs +++ b/lighthouse/environment/src/tracing_common.rs @@ -33,8 +33,14 @@ pub fn construct_logger( let subcommand_name = matches.subcommand_name(); let logfile_prefix = subcommand_name.unwrap_or("lighthouse"); + let file_mode = if logger_config.is_restricted { + 0o600 + } else { + 0o644 + }; + let (builder, stdout_logging_layer, file_logging_layer, sse_logging_layer_opt) = - environment_builder.init_tracing(logger_config.clone(), logfile_prefix); + environment_builder.init_tracing(logger_config.clone(), logfile_prefix, file_mode); let libp2p_discv5_layer = if let Some(subcommand_name) = subcommand_name { if subcommand_name == "beacon_node" @@ -49,6 +55,7 @@ pub fn construct_logger( create_libp2p_discv5_tracing_layer( logger_config.path.clone(), logger_config.max_log_size, + file_mode, ) } } else { diff --git a/testing/execution_engine_integration/Cargo.toml b/testing/execution_engine_integration/Cargo.toml index 55c42eb9d3..07d8d98f1d 100644 --- a/testing/execution_engine_integration/Cargo.toml +++ b/testing/execution_engine_integration/Cargo.toml @@ -3,6 +3,9 @@ name = "execution_engine_integration" version = "0.1.0" edition = { workspace = true } +[features] +portable = ["types/portable"] + [dependencies] async-channel = { workspace = true } deposit_contract = { workspace = true } @@ -23,6 +26,3 @@ tempfile = { workspace = true } tokio = { workspace = true } types = { workspace = true } unused_port = { workspace = true } - -[features] -portable = ["types/portable"] diff --git a/testing/state_transition_vectors/Cargo.toml b/testing/state_transition_vectors/Cargo.toml index 7c29715346..66376f0a51 100644 --- a/testing/state_transition_vectors/Cargo.toml +++ b/testing/state_transition_vectors/Cargo.toml @@ -3,6 +3,9 @@ name = "state_transition_vectors" version = "0.1.0" authors = ["Paul Hauner "] edition = { workspace = true } + +[features] +portable = ["beacon_chain/portable"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] @@ -11,6 +14,3 @@ ethereum_ssz = { workspace = true } state_processing = { workspace = true } tokio = { workspace = true } types = { workspace = true } - -[features] -portable = ["beacon_chain/portable"] diff --git a/testing/web3signer_tests/Cargo.toml b/testing/web3signer_tests/Cargo.toml index f68fa56e16..b4637b4030 100644 --- a/testing/web3signer_tests/Cargo.toml +++ b/testing/web3signer_tests/Cargo.toml @@ -10,6 +10,7 @@ edition = { workspace = true } account_utils = { workspace = true } async-channel = { workspace = true } environment = { workspace = true } +eth2 = { workspace = true } eth2_keystore = { workspace = true } eth2_network_config = { workspace = true } futures = { workspace = true } diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index 8678eff0ee..4bc0f62346 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -20,6 +20,7 @@ mod tests { use account_utils::validator_definitions::{ SigningDefinition, ValidatorDefinition, ValidatorDefinitions, Web3SignerDefinition, }; + use eth2::types::FullBlockContents; use eth2_keystore::KeystoreBuilder; use eth2_network_config::Eth2NetworkConfig; use initialized_validators::{ @@ -45,7 +46,9 @@ mod tests { use tokio::time::sleep; use types::{attestation::AttestationBase, *}; use url::Url; - use validator_store::{Error as ValidatorStoreError, SignedBlock, ValidatorStore}; + use validator_store::{ + Error as ValidatorStoreError, SignedBlock, UnsignedBlock, ValidatorStore, + }; /// If the we are unable to reach the Web3Signer HTTP API within this time out then we will /// assume it failed to start. @@ -595,8 +598,9 @@ mod tests { async move { let block = BeaconBlock::::Base(BeaconBlockBase::empty(&spec)); let block_slot = block.slot(); + let unsigned_block = UnsignedBlock::Full(FullBlockContents::Block(block)); validator_store - .sign_block(pubkey, block.into(), block_slot) + .sign_block(pubkey, unsigned_block, block_slot) .await .unwrap() } @@ -665,12 +669,10 @@ mod tests { async move { let mut altair_block = BeaconBlockAltair::empty(&spec); altair_block.slot = altair_fork_slot; + let unsigned_block = + UnsignedBlock::Full(FullBlockContents::Block(altair_block.into())); validator_store - .sign_block( - pubkey, - BeaconBlock::::Altair(altair_block).into(), - altair_fork_slot, - ) + .sign_block(pubkey, unsigned_block, altair_fork_slot) .await .unwrap() } @@ -752,12 +754,10 @@ mod tests { async move { let mut bellatrix_block = BeaconBlockBellatrix::empty(&spec); bellatrix_block.slot = bellatrix_fork_slot; + let unsigned_block = + UnsignedBlock::Full(FullBlockContents::Block(bellatrix_block.into())); validator_store - .sign_block( - pubkey, - BeaconBlock::::Bellatrix(bellatrix_block).into(), - bellatrix_fork_slot, - ) + .sign_block(pubkey, unsigned_block, bellatrix_fork_slot) .await .unwrap() } @@ -876,8 +876,9 @@ mod tests { .assert_signatures_match("first_block", |pubkey, validator_store| async move { let block = first_block(); let slot = block.slot(); + let unsigned_block = UnsignedBlock::Full(FullBlockContents::Block(block)); validator_store - .sign_block(pubkey, block.into(), slot) + .sign_block(pubkey, unsigned_block, slot) .await .unwrap() }) @@ -887,8 +888,9 @@ mod tests { move |pubkey, validator_store| async move { let block = double_vote_block(); let slot = block.slot(); + let unsigned_block = UnsignedBlock::Full(FullBlockContents::Block(block)); validator_store - .sign_block(pubkey, block.into(), slot) + .sign_block(pubkey, unsigned_block, slot) .await .map(|_| ()) }, diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index d07f95f11c..2cb6ba435e 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -1,5 +1,6 @@ use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition}; use doppelganger_service::DoppelgangerService; +use eth2::types::PublishBlockRequest; use initialized_validators::InitializedValidators; use logging::crit; use parking_lot::{Mutex, RwLock}; @@ -733,14 +734,18 @@ impl ValidatorStore for LighthouseValidatorS current_slot: Slot, ) -> Result, Error> { match block { - UnsignedBlock::Full(block) => self - .sign_abstract_block(validator_pubkey, block, current_slot) - .await - .map(SignedBlock::Full), + UnsignedBlock::Full(block) => { + let (block, blobs) = block.deconstruct(); + self.sign_abstract_block(validator_pubkey, block, current_slot) + .await + .map(|block| { + SignedBlock::Full(PublishBlockRequest::new(Arc::new(block), blobs)) + }) + } UnsignedBlock::Blinded(block) => self .sign_abstract_block(validator_pubkey, block, current_slot) .await - .map(SignedBlock::Blinded), + .map(|block| SignedBlock::Blinded(Arc::new(block))), } } diff --git a/validator_client/slashing_protection/Cargo.toml b/validator_client/slashing_protection/Cargo.toml index 88e6dd794d..3860af514d 100644 --- a/validator_client/slashing_protection/Cargo.toml +++ b/validator_client/slashing_protection/Cargo.toml @@ -5,9 +5,9 @@ authors = ["Michael Sproul ", "pscott ProductionValidatorClient { info!("Doppelganger protection disabled.") } - spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?; + let context = self.context.service_context("notifier".into()); + spawn_notifier( + self.duties_service.clone(), + context.executor, + &self.context.eth2_config.spec, + ) + .map_err(|e| format!("Failed to start notifier: {}", e))?; if self.config.enable_latency_measurement_service { - latency::start_latency_service( - self.context.clone(), + latency_service::start_latency_service( + self.context.executor.clone(), self.duties_service.slot_clock.clone(), self.duties_service.beacon_nodes.clone(), ); diff --git a/validator_client/validator_services/Cargo.toml b/validator_client/validator_services/Cargo.toml index 86208dadef..c914940914 100644 --- a/validator_client/validator_services/Cargo.toml +++ b/validator_client/validator_services/Cargo.toml @@ -14,11 +14,11 @@ graffiti_file = { workspace = true } logging = { workspace = true } parking_lot = { workspace = true } safe_arith = { workspace = true } -slot_clock = { workspace = true } +slot_clock = { workspace = true } task_executor = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } -tree_hash = { workspace = true } -types = { workspace = true } +tree_hash = { workspace = true } +types = { workspace = true } validator_metrics = { workspace = true } validator_store = { workspace = true } diff --git a/validator_client/validator_services/src/block_service.rs b/validator_client/validator_services/src/block_service.rs index 2f29c1feb7..01f786e160 100644 --- a/validator_client/validator_services/src/block_service.rs +++ b/validator_client/validator_services/src/block_service.rs @@ -1,6 +1,5 @@ use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, Error as FallbackError, Errors}; use bls::SignatureBytes; -use eth2::types::{FullBlockContents, PublishBlockRequest}; use eth2::{BeaconNodeHttpClient, StatusCode}; use graffiti_file::{determine_graffiti, GraffitiFile}; use logging::crit; @@ -13,11 +12,8 @@ use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tracing::{debug, error, info, trace, warn}; -use types::{ - BlindedBeaconBlock, BlockType, ChainSpec, EthSpec, Graffiti, PublicKeyBytes, - SignedBlindedBeaconBlock, Slot, -}; -use validator_store::{Error as ValidatorStoreError, ValidatorStore}; +use types::{BlockType, ChainSpec, EthSpec, Graffiti, PublicKeyBytes, Slot}; +use validator_store::{Error as ValidatorStoreError, SignedBlock, UnsignedBlock, ValidatorStore}; #[derive(Debug)] pub enum BlockError { @@ -335,26 +331,10 @@ impl BlockService { ) -> Result<(), BlockError> { let signing_timer = validator_metrics::start_timer(&validator_metrics::BLOCK_SIGNING_TIMES); - let (block, maybe_blobs) = match unsigned_block { - UnsignedBlock::Full(block_contents) => { - let (block, maybe_blobs) = block_contents.deconstruct(); - (block.into(), maybe_blobs) - } - UnsignedBlock::Blinded(block) => (block.into(), None), - }; - let res = self .validator_store - .sign_block(*validator_pubkey, block, slot) - .await - .map(|block| match block { - validator_store::SignedBlock::Full(block) => { - SignedBlock::Full(PublishBlockRequest::new(Arc::new(block), maybe_blobs)) - } - validator_store::SignedBlock::Blinded(block) => { - SignedBlock::Blinded(Arc::new(block)) - } - }); + .sign_block(*validator_pubkey, unsigned_block, slot) + .await; let signed_block = match res { Ok(block) => block, @@ -398,12 +378,13 @@ impl BlockService { }) .await?; + let metadata = BlockMetadata::from(&signed_block); info!( - block_type = ?signed_block.block_type(), - deposits = signed_block.num_deposits(), - attestations = signed_block.num_attestations(), + block_type = ?metadata.block_type, + deposits = metadata.num_deposits, + attestations = metadata.num_attestations, graffiti = ?graffiti.map(|g| g.as_utf8_lossy()), - slot = signed_block.slot().as_u64(), + slot = metadata.slot.as_u64(), "Successfully published block" ); Ok(()) @@ -508,7 +489,6 @@ impl BlockService { signed_block: &SignedBlock, beacon_node: BeaconNodeHttpClient, ) -> Result<(), BlockError> { - let slot = signed_block.slot(); match signed_block { SignedBlock::Full(signed_block) => { let _post_timer = validator_metrics::start_timer_vec( @@ -518,7 +498,9 @@ impl BlockService { beacon_node .post_beacon_blocks_v2_ssz(signed_block, None) .await - .or_else(|e| handle_block_post_error(e, slot))? + .or_else(|e| { + handle_block_post_error(e, signed_block.signed_block().message().slot()) + })? } SignedBlock::Blinded(signed_block) => { let _post_timer = validator_metrics::start_timer_vec( @@ -528,7 +510,7 @@ impl BlockService { beacon_node .post_beacon_blinded_blocks_v2_ssz(signed_block, None) .await - .or_else(|e| handle_block_post_error(e, slot))? + .or_else(|e| handle_block_post_error(e, signed_block.message().slot()))? } } Ok::<_, BlockError>(()) @@ -557,13 +539,17 @@ impl BlockService { )) })?; - let unsigned_block = match block_response.data { - eth2::types::ProduceBlockV3Response::Full(block) => UnsignedBlock::Full(block), - eth2::types::ProduceBlockV3Response::Blinded(block) => UnsignedBlock::Blinded(block), + let (block_proposer, unsigned_block) = match block_response.data { + eth2::types::ProduceBlockV3Response::Full(block) => { + (block.block().proposer_index(), UnsignedBlock::Full(block)) + } + eth2::types::ProduceBlockV3Response::Blinded(block) => { + (block.proposer_index(), UnsignedBlock::Blinded(block)) + } }; info!(slot = slot.as_u64(), "Received unsigned block"); - if proposer_index != Some(unsigned_block.proposer_index()) { + if proposer_index != Some(block_proposer) { return Err(BlockError::Recoverable( "Proposer index does not match block proposer. Beacon chain re-orged".to_string(), )); @@ -573,49 +559,30 @@ impl BlockService { } } -pub enum UnsignedBlock { - Full(FullBlockContents), - Blinded(BlindedBeaconBlock), +/// Wrapper for values we want to log about a block we signed, for easy extraction from the possible +/// variants. +struct BlockMetadata { + block_type: BlockType, + slot: Slot, + num_deposits: usize, + num_attestations: usize, } -impl UnsignedBlock { - pub fn proposer_index(&self) -> u64 { - match self { - UnsignedBlock::Full(block) => block.block().proposer_index(), - UnsignedBlock::Blinded(block) => block.proposer_index(), - } - } -} - -#[derive(Debug)] -pub enum SignedBlock { - Full(PublishBlockRequest), - Blinded(Arc>), -} - -impl SignedBlock { - pub fn block_type(&self) -> BlockType { - match self { - SignedBlock::Full(_) => BlockType::Full, - SignedBlock::Blinded(_) => BlockType::Blinded, - } - } - pub fn slot(&self) -> Slot { - match self { - SignedBlock::Full(block) => block.signed_block().message().slot(), - SignedBlock::Blinded(block) => block.message().slot(), - } - } - pub fn num_deposits(&self) -> usize { - match self { - SignedBlock::Full(block) => block.signed_block().message().body().deposits().len(), - SignedBlock::Blinded(block) => block.message().body().deposits().len(), - } - } - pub fn num_attestations(&self) -> usize { - match self { - SignedBlock::Full(block) => block.signed_block().message().body().attestations_len(), - SignedBlock::Blinded(block) => block.message().body().attestations_len(), +impl From<&SignedBlock> for BlockMetadata { + fn from(value: &SignedBlock) -> Self { + match value { + SignedBlock::Full(block) => BlockMetadata { + block_type: BlockType::Full, + slot: block.signed_block().message().slot(), + num_deposits: block.signed_block().message().body().deposits().len(), + num_attestations: block.signed_block().message().body().attestations_len(), + }, + SignedBlock::Blinded(block) => BlockMetadata { + block_type: BlockType::Blinded, + slot: block.message().slot(), + num_deposits: block.message().body().deposits().len(), + num_attestations: block.message().body().attestations_len(), + }, } } } diff --git a/validator_client/src/latency.rs b/validator_client/validator_services/src/latency_service.rs similarity index 91% rename from validator_client/src/latency.rs rename to validator_client/validator_services/src/latency_service.rs index 2382d350af..c810a03a80 100644 --- a/validator_client/src/latency.rs +++ b/validator_client/validator_services/src/latency_service.rs @@ -1,10 +1,9 @@ use beacon_node_fallback::BeaconNodeFallback; -use environment::RuntimeContext; use slot_clock::SlotClock; use std::sync::Arc; +use task_executor::TaskExecutor; use tokio::time::sleep; use tracing::debug; -use types::EthSpec; /// The latency service will run 11/12ths of the way through the slot. pub const SLOT_DELAY_MULTIPLIER: u32 = 11; @@ -12,8 +11,8 @@ pub const SLOT_DELAY_DENOMINATOR: u32 = 12; /// Starts a service that periodically checks the latency between the VC and the /// candidate BNs. -pub fn start_latency_service( - context: RuntimeContext, +pub fn start_latency_service( + executor: TaskExecutor, slot_clock: T, beacon_nodes: Arc>, ) { @@ -57,5 +56,5 @@ pub fn start_latency_service( } }; - context.executor.spawn(future, "latency"); + executor.spawn(future, "latency"); } diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index abf8fab3cb..3b8bd9ae14 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -1,6 +1,8 @@ pub mod attestation_service; pub mod block_service; pub mod duties_service; +pub mod latency_service; +pub mod notifier_service; pub mod preparation_service; pub mod sync; pub mod sync_committee_service; diff --git a/validator_client/src/notifier.rs b/validator_client/validator_services/src/notifier_service.rs similarity index 86% rename from validator_client/src/notifier.rs rename to validator_client/validator_services/src/notifier_service.rs index 05f1c919d2..6b8ea04edb 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/validator_services/src/notifier_service.rs @@ -1,18 +1,20 @@ -use crate::{DutiesService, ProductionValidatorClient}; -use lighthouse_validator_store::LighthouseValidatorStore; -use metrics::set_gauge; +use crate::duties_service::DutiesService; use slot_clock::SlotClock; +use std::sync::Arc; +use task_executor::TaskExecutor; use tokio::time::{sleep, Duration}; use tracing::{debug, error, info}; -use types::EthSpec; +use types::{ChainSpec, EthSpec}; +use validator_metrics::set_gauge; +use validator_store::ValidatorStore; /// Spawns a notifier service which periodically logs information about the node. -pub fn spawn_notifier(client: &ProductionValidatorClient) -> Result<(), String> { - let context = client.context.service_context("notifier".into()); - let executor = context.executor.clone(); - let duties_service = client.duties_service.clone(); - - let slot_duration = Duration::from_secs(context.eth2_config.spec.seconds_per_slot); +pub fn spawn_notifier( + duties_service: Arc>, + executor: TaskExecutor, + spec: &ChainSpec, +) -> Result<(), String> { + let slot_duration = Duration::from_secs(spec.seconds_per_slot); let interval_fut = async move { loop { @@ -33,9 +35,7 @@ pub fn spawn_notifier(client: &ProductionValidatorClient) -> Resu } /// Performs a single notification routine. -async fn notify( - duties_service: &DutiesService, T>, -) { +async fn notify(duties_service: &DutiesService) { let (candidate_info, num_available, num_synced) = duties_service.beacon_nodes.get_notifier_info().await; let num_total = candidate_info.len(); @@ -102,7 +102,7 @@ async fn notify( } if let Some(slot) = duties_service.slot_clock.now() { - let epoch = slot.epoch(E::slots_per_epoch()); + let epoch = slot.epoch(S::E::slots_per_epoch()); let total_validators = duties_service.total_validator_count(); let proposing_validators = duties_service.proposer_count(epoch); diff --git a/validator_client/validator_store/Cargo.toml b/validator_client/validator_store/Cargo.toml index 91df9dc3ab..8c5451b2d0 100644 --- a/validator_client/validator_store/Cargo.toml +++ b/validator_client/validator_store/Cargo.toml @@ -5,5 +5,6 @@ edition = { workspace = true } authors = ["Sigma Prime "] [dependencies] +eth2 = { workspace = true } slashing_protection = { workspace = true } types = { workspace = true } diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 9de3a6d66a..c3b551c249 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -1,12 +1,13 @@ +use eth2::types::{FullBlockContents, PublishBlockRequest}; use slashing_protection::NotSafe; use std::fmt::Debug; use std::future::Future; +use std::sync::Arc; use types::{ - Address, Attestation, AttestationError, BeaconBlock, BlindedBeaconBlock, Epoch, EthSpec, - Graffiti, Hash256, PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, - SignedBeaconBlock, SignedBlindedBeaconBlock, SignedContributionAndProof, - SignedValidatorRegistrationData, Slot, SyncCommitteeContribution, SyncCommitteeMessage, - SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + Address, Attestation, AttestationError, BlindedBeaconBlock, Epoch, EthSpec, Graffiti, Hash256, + PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, SignedBlindedBeaconBlock, + SignedContributionAndProof, SignedValidatorRegistrationData, Slot, SyncCommitteeContribution, + SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, }; #[derive(Debug, PartialEq, Clone)] @@ -170,40 +171,16 @@ pub trait ValidatorStore: Send + Sync { fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option; } -#[derive(Clone, Debug, PartialEq)] +#[derive(Debug)] pub enum UnsignedBlock { - Full(BeaconBlock), + Full(FullBlockContents), Blinded(BlindedBeaconBlock), } -impl From> for UnsignedBlock { - fn from(block: BeaconBlock) -> Self { - UnsignedBlock::Full(block) - } -} - -impl From> for UnsignedBlock { - fn from(block: BlindedBeaconBlock) -> Self { - UnsignedBlock::Blinded(block) - } -} - #[derive(Clone, Debug, PartialEq)] pub enum SignedBlock { - Full(SignedBeaconBlock), - Blinded(SignedBlindedBeaconBlock), -} - -impl From> for SignedBlock { - fn from(block: SignedBeaconBlock) -> Self { - SignedBlock::Full(block) - } -} - -impl From> for SignedBlock { - fn from(block: SignedBlindedBeaconBlock) -> Self { - SignedBlock::Blinded(block) - } + Full(PublishBlockRequest), + Blinded(Arc>), } /// A wrapper around `PublicKeyBytes` which encodes information about the status of a validator