diff --git a/Cargo.lock b/Cargo.lock index fac8f42232..bb53ce8330 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1232,8 +1232,8 @@ dependencies = [ "eth2_network_config", "ethereum_hashing", "ethereum_serde_utils", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "execution_layer", "fixed_bytes", "fork_choice", @@ -1259,10 +1259,12 @@ dependencies = [ "proto_array", "rand 0.9.2", "rayon", + "reth-era", "safe_arith", "sensitive_url", "serde", "serde_json", + "serde_yaml", "slasher", "slot_clock", "smallvec", @@ -1508,7 +1510,7 @@ dependencies = [ "blst", "ethereum_hashing", "ethereum_serde_utils", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "fixed_bytes", "hex", "rand 0.9.2", @@ -1555,7 +1557,7 @@ dependencies = [ "clap", "clap_utils", "eth2_network_config", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "hex", "lighthouse_network", "log", @@ -1613,7 +1615,7 @@ dependencies = [ "bls", "context_deserialize", "eth2", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "lighthouse_version", "mockito", "reqwest", @@ -1881,7 +1883,7 @@ dependencies = [ "clap", "dirs", "eth2_network_config", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "hex", "serde", "serde_json", @@ -1900,7 +1902,7 @@ dependencies = [ "environment", "eth2", "eth2_config", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "execution_layer", "futures", "genesis", @@ -2334,6 +2336,16 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + [[package]] name = "darling" version = "0.21.3" @@ -2354,6 +2366,20 @@ dependencies = [ "darling_macro 0.23.0", ] +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.111", +] + [[package]] name = "darling_core" version = "0.21.3" @@ -2382,6 +2408,17 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core 0.20.11", + "quote", + "syn 2.0.111", +] + [[package]] name = "darling_macro" version = "0.21.3" @@ -2506,7 +2543,7 @@ dependencies = [ "alloy-json-abi", "alloy-primitives", "bls", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "hex", "reqwest", "serde_json", @@ -2842,8 +2879,8 @@ dependencies = [ "context_deserialize", "educe", "eth2_network_config", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "execution_layer", "fork_choice", "fs2", @@ -3123,8 +3160,8 @@ dependencies = [ "enr", "eth2_keystore", "ethereum_serde_utils", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "futures", "futures-util", "libp2p-identity", @@ -3210,7 +3247,7 @@ dependencies = [ "bytes", "discv5", "eth2_config", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "fixed_bytes", "kzg", "pretty_reqwest_error", @@ -3275,6 +3312,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "ethereum_ssz" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dcddb2554d19cde19b099fadddde576929d7a4d0c1cd3512d1fd95cf174375c" +dependencies = [ + "alloy-primitives", + "ethereum_serde_utils", + "itertools 0.13.0", + "serde", + "serde_derive", + "smallvec", + "typenum", +] + [[package]] name = "ethereum_ssz" version = "0.10.1" @@ -3292,6 +3344,18 @@ dependencies = [ "typenum", ] +[[package]] +name = "ethereum_ssz_derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a657b6b3b7e153637dc6bdc6566ad9279d9ee11a15b12cfb24a2e04360637e9f" +dependencies = [ + "darling 0.20.11", + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "ethereum_ssz_derive" version = "0.10.1" @@ -3385,7 +3449,7 @@ dependencies = [ "bytes", "eth2", "ethereum_serde_utils", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "fixed_bytes", "fork_choice", "hash-db", @@ -3583,8 +3647,8 @@ name = "fork_choice" version = "0.1.0" dependencies = [ "beacon_chain", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "fixed_bytes", "logging", "metrics", @@ -3778,7 +3842,7 @@ version = "0.2.0" dependencies = [ "bls", "ethereum_hashing", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "int_to_bytes", "merkle_proof", "rayon", @@ -4198,7 +4262,7 @@ dependencies = [ "either", "eth2", "ethereum_serde_utils", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "execution_layer", "fixed_bytes", "futures", @@ -4839,8 +4903,8 @@ dependencies = [ "educe", "ethereum_hashing", "ethereum_serde_utils", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "hex", "rayon", "rust_eth_kzg", @@ -4880,7 +4944,7 @@ dependencies = [ "eth2_network_config", "eth2_wallet", "ethereum_hashing", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "execution_layer", "fixed_bytes", "hex", @@ -5417,8 +5481,8 @@ dependencies = [ "discv5", "either", "eth2", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "fixed_bytes", "fnv", "futures", @@ -5761,8 +5825,8 @@ dependencies = [ "context_deserialize", "educe", "ethereum_hashing", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "itertools 0.13.0", "parking_lot", "rayon", @@ -6062,7 +6126,7 @@ dependencies = [ "educe", "eth2", "eth2_network_config", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "execution_layer", "fixed_bytes", "fnv", @@ -6457,8 +6521,8 @@ dependencies = [ "bitvec", "bls", "educe", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "fixed_bytes", "itertools 0.14.0", "maplit", @@ -7023,8 +7087,8 @@ dependencies = [ name = "proto_array" version = "0.2.0" dependencies = [ - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "fixed_bytes", "safe_arith", "serde", @@ -7450,6 +7514,21 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7" +[[package]] +name = "reth-era" +version = "1.9.3" +source = "git+https://github.com/paradigmxyz/reth.git?rev=62abfdaeb54e8a205a8ee085ddebd56047d93374#62abfdaeb54e8a205a8ee085ddebd56047d93374" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "alloy-rlp", + "ethereum_ssz 0.9.1", + "ethereum_ssz_derive 0.9.1", + "snap", + "thiserror 2.0.17", +] + [[package]] name = "rfc6979" version = "0.4.0" @@ -8242,8 +8321,8 @@ dependencies = [ "bls", "byteorder", "educe", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "filesystem", "fixed_bytes", "flate2", @@ -8407,7 +8486,7 @@ dependencies = [ "context_deserialize", "educe", "ethereum_serde_utils", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "itertools 0.14.0", "serde", "serde_derive", @@ -8431,8 +8510,8 @@ dependencies = [ "bls", "educe", "ethereum_hashing", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "fixed_bytes", "int_to_bytes", "integer-sqrt", @@ -8459,7 +8538,7 @@ version = "0.1.0" dependencies = [ "beacon_chain", "bls", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "fixed_bytes", "state_processing", "tokio", @@ -8481,8 +8560,8 @@ dependencies = [ "criterion", "db-key", "directory", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "fixed_bytes", "itertools 0.14.0", "leveldb", @@ -9285,7 +9364,7 @@ checksum = "f7fd51aa83d2eb83b04570808430808b5d24fdbf479a4d5ac5dee4a2e2dd2be4" dependencies = [ "alloy-primitives", "ethereum_hashing", - "ethereum_ssz", + "ethereum_ssz 0.10.1", "smallvec", "typenum", ] @@ -9350,8 +9429,8 @@ dependencies = [ "eth2_interop_keypairs", "ethereum_hashing", "ethereum_serde_utils", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.1", + "ethereum_ssz_derive 0.10.1", "fixed_bytes", "hex", "int_to_bytes", diff --git a/Cargo.toml b/Cargo.toml index 222392bcb7..e980ee220f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -207,6 +207,7 @@ rand = "0.9.0" rayon = "1.7" regex = "1" reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "stream", "rustls-tls"] } +reth-era = { git = "https://github.com/paradigmxyz/reth.git", rev = "62abfdaeb54e8a205a8ee085ddebd56047d93374" } ring = "0.17" rpds = "0.11" rusqlite = { version = "0.38", features = ["bundled"] } diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index a06db8934b..b63115cb3f 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -50,6 +50,7 @@ parking_lot = { workspace = true } proto_array = { workspace = true } rand = { workspace = true } rayon = { workspace = true } +reth-era = { workspace = true } safe_arith = { workspace = true } sensitive_url = { workspace = true } serde = { workspace = true } @@ -79,6 +80,7 @@ maplit = { workspace = true } mockall = { workspace = true } mockall_double = { workspace = true } serde_json = { workspace = true } +serde_yaml = { workspace = true } [[bench]] name = "benches" diff --git a/beacon_node/beacon_chain/src/era/consumer.rs b/beacon_node/beacon_chain/src/era/consumer.rs new file mode 100644 index 0000000000..a4e841543b --- /dev/null +++ b/beacon_node/beacon_chain/src/era/consumer.rs @@ -0,0 +1,449 @@ +use bls::FixedBytesExtended; +use rayon::prelude::*; +use reth_era::common::file_ops::StreamReader; +use reth_era::era::file::EraReader; +use reth_era::era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock}; +use std::fs::{self, File}; +use std::path::{Path, PathBuf}; +use store::{DBColumn, HotColdDB, ItemStore, KeyValueStoreOp}; +use tracing::{debug, debug_span, instrument, warn}; +use tree_hash::TreeHash; +use types::{ + BeaconState, ChainSpec, EthSpec, Hash256, HistoricalBatch, HistoricalSummary, + SignedBeaconBlock, Slot, +}; + +fn decode_block( + compressed: CompressedSignedBeaconBlock, + spec: &ChainSpec, +) -> Result, String> { + let bytes = compressed + .decompress() + .map_err(|error| format!("failed to decompress block: {error:?}"))?; + SignedBeaconBlock::from_ssz_bytes(&bytes, spec) + .map_err(|error| format!("failed to decode block: {error:?}")) +} + +fn decode_state( + compressed: CompressedBeaconState, + spec: &ChainSpec, +) -> Result, String> { + let bytes = compressed + .decompress() + .map_err(|error| format!("failed to decompress state: {error:?}"))?; + BeaconState::from_ssz_bytes(&bytes, spec) + .map_err(|error| format!("failed to decode state: {error:?}")) +} + +pub struct EraFileDir { + dir: PathBuf, + network_name: String, + genesis_validators_root: Hash256, + historical_roots: Vec, + historical_summaries: Vec, + max_era: u64, +} + +impl EraFileDir { + pub fn new(era_files_dir: &Path, spec: &ChainSpec) -> Result { + let mut era_files = list_era_files(era_files_dir)?; + era_files.sort_by_key(|(era_number, _)| *era_number); + + let network_name = spec + .config_name + .clone() + .unwrap_or_else(|| "unknown".to_string()); + + let Some((max_era, reference_path)) = era_files.last().cloned() else { + return Err("era files directory is empty".to_string()); + }; + + let reference_state = read_era_state::(&reference_path, &network_name, spec)?; + + // historical_roots was frozen in capella, and continued as historical_summaries + let historical_roots = reference_state.historical_roots().to_vec(); + // Pre-Capella states don't have historical_summaries property + let historical_summaries = match reference_state.historical_summaries() { + Ok(list) => list.to_vec(), + Err(_) => vec![], + }; + + let dir = era_files_dir.to_path_buf(); + let era_dir = Self { + dir, + network_name, + genesis_validators_root: reference_state.genesis_validators_root(), + historical_roots, + historical_summaries, + max_era, + }; + + // Verify that every expected era file name exists in the directory. + for era_number in 0..=era_dir.max_era { + let expected = era_dir.expected_path(era_number); + if !expected.exists() { + return Err(format!("missing era file: {expected:?}")); + } + } + + Ok(era_dir) + } + + pub fn max_era(&self) -> u64 { + self.max_era + } + + pub fn genesis_validators_root(&self) -> Hash256 { + self.genesis_validators_root + } + + #[instrument(level = "debug", skip_all, fields(era_number = %era_number))] + pub fn import_era_file, Cold: ItemStore>( + &self, + store: &HotColdDB, + era_number: u64, + spec: &ChainSpec, + trusted_state: Option<(Hash256, Slot)>, + ) -> Result<(), String> { + let path = self.expected_path(era_number); + debug!(?path, era_number, "Importing era file"); + let file = File::open(path).map_err(|error| format!("failed to open era file: {error}"))?; + let era_file = { + let _span = debug_span!("era_import_read").entered(); + EraReader::new(file) + .read_and_assemble(self.network_name.clone()) + .map_err(|error| format!("failed to parse era file: {error:?}"))? + }; + + // Consistency checks: ensure the era state matches the expected historical root and that + // each block root matches the state block_roots for its slot. + let mut state = { + let _span = debug_span!("era_import_decode_state").entered(); + decode_state::(era_file.group.era_state, spec)? + }; + + // Verify trusted state root if provided + if let Some((expected_root, expected_slot)) = trusted_state { + if state.slot() != expected_slot { + return Err(format!( + "trusted slot mismatch: expected {expected_slot}, got {}", + state.slot() + )); + } + let actual_root = state + .canonical_root() + .map_err(|e| format!("Failed to compute state root: {e:?}"))?; + if actual_root != expected_root { + return Err(format!( + "trusted state root mismatch at slot {expected_slot}: \ + expected {expected_root:?}, got {actual_root:?}" + )); + } + } + + let expected_root = self + .era_file_name_root(era_number) + .ok_or_else(|| format!("missing historical root for era {era_number}"))?; + let actual_root = era_root_from_state(&state, era_number)?; + if expected_root != actual_root { + return Err(format!( + "era root mismatch for era {era_number}: expected {expected_root:?}, got {actual_root:?}" + )); + } + + let slots_per_historical_root = E::slots_per_historical_root() as u64; + let _start_slot = Slot::new(era_number.saturating_sub(1) * slots_per_historical_root); + let end_slot = Slot::new(era_number * slots_per_historical_root); + if state.slot() != end_slot { + return Err(format!( + "era state slot mismatch: expected {end_slot}, got {}", + state.slot() + )); + } + + let historical_summaries_active_at_slot = match store.spec.capella_fork_epoch { + // For mainnet case, Capella activates at 194048 epoch = 6209536 slot = 758 era number. + // The last epoch processing before capella transition adds a last entry to historical + // roots. So historical_roots.len() == 758 at the capella fork boundary. An ERA file + // includes the state AFTER advanced (or applying) the block at the final slot, so the + // state for ERA file 758 is of the Capella variant. However, historical summaries are + // still empty. + Some(epoch) => { + epoch.start_slot(E::slots_per_epoch()) + + Slot::new(E::slots_per_historical_root() as u64) + } + None => Slot::max_value(), + }; + + // Check that the block roots vector in this state match the historical summary in the last + // state. Asserts that the blocks are exactly the expected ones given a trusted final state + if era_number == 0 { + // Skip checking genesis state era file for now + } else if state.slot() >= historical_summaries_active_at_slot { + // Post-capella state, check against historical summaries + // ```py + // historical_summary = HistoricalSummary( + // block_summary_root=hash_tree_root(state.block_roots), + // state_summary_root=hash_tree_root(state.state_roots), + // ) + // state.historical_summaries.append(historical_summary) + // ``` + let index = era_number.saturating_sub(1) as usize; + // historical_summaries started to be appended after capella, so we need to offset + let summary_index = index + .checked_sub(self.historical_roots.len()) + .ok_or_else(|| format!( + "Not enough historical roots era number {era_number} index {index} historical_roots len {}", + self.historical_roots.len() + ))?; + let expected_root = self + .historical_summaries + .get(summary_index) + .ok_or_else(|| format!("missing historical summary for era {era_number}"))? + .block_summary_root(); + let actual_root = state.block_roots().tree_hash_root(); + if actual_root != expected_root { + return Err(format!( + "block summary root post-capella mismatch for era {}: {:?} != {:?}", + era_number, expected_root, actual_root + )); + } + } else { + // Pre-capella state, check against historical roots + // ```py + // historical_batch = HistoricalBatch(block_roots=state.block_roots, state_roots=state.state_roots) + // state.historical_roots.append(hash_tree_root(historical_batch)) + // ``` + let index = era_number.saturating_sub(1) as usize; + let expected_root = *self + .historical_roots + .get(index) + .ok_or_else(|| format!("missing historical root for era {era_number}"))?; + let historical_batch = HistoricalBatch:: { + block_roots: state.block_roots().clone(), + state_roots: state.state_roots().clone(), + }; + let actual_root = historical_batch.tree_hash_root(); + if actual_root != expected_root { + return Err(format!( + "block summary root pre-capella mismatch for era {}: {:?} != {:?}", + era_number, expected_root, actual_root + )); + } + } + + debug!(era_number, "Importing blocks from era file"); + // TODO(era): Block signatures are not verified here and are trusted. + // decode and hash is split in two loops to track timings better. If we add spans for each + // block it's too short and the data is not really useful. + let decoded_blocks = { + let _span = debug_span!("era_import_decode_blocks").entered(); + era_file + .group + .blocks + .into_par_iter() + .map(|compressed_block| decode_block::(compressed_block, spec)) + .collect::, _>>()? + }; + let blocks_with_roots = { + let _span = debug_span!("era_import_hash_blocks").entered(); + decoded_blocks + .into_par_iter() + .map(|block| (block.canonical_root(), block)) + .collect::>() + }; + + let mut block_ops = vec![]; + { + let _ = debug_span!("era_import_db_ops_blocks").entered(); + for (block_root, block) in blocks_with_roots { + let slot = block.slot(); + // Check consistency that this block is expected w.r.t. the state in the era file. + // Since we check that the state block roots match the historical summary, we know that + // this block root is the expected one. + let expected_block_root = state + .get_block_root(slot) + .map_err(|error| format!("failed to read block root {slot}: {error:?}"))?; + if *expected_block_root != block_root { + return Err(format!( + "block root mismatch at slot {slot}: expected {expected_block_root:?}, got {block_root:?}" + )); + } + store + .block_as_kv_store_ops(&block_root, block, &mut block_ops) + .map_err(|error| format!("failed to store block: {error:?}"))?; + } + } + { + let _ = debug_span!("era_import_write_blocks").entered(); + store + .hot_db + .do_atomically(block_ops) + .map_err(|error| format!("failed to store blocks: {error:?}"))?; + } + + // Populate the cold DB slot -> block root index from the state.block_roots() + { + let _span = debug_span!("era_import_write_block_index").entered(); + write_block_root_index_for_era(store, &state, era_number)?; + } + + debug!(era_number, "Importing state from era file"); + { + let _span = debug_span!("era_import_write_state").entered(); + let state_root = state + .canonical_root() + .map_err(|error| format!("failed to hash state: {error:?}"))?; + // Use put_cold_state as the split is not updated and we need the state into the cold store. + store + .put_cold_state(&state_root, &state) + .map_err(|error| format!("failed to store state: {error:?}"))?; + } + Ok(()) + } + + fn expected_path(&self, era_number: u64) -> PathBuf { + let root = self + .era_file_name_root(era_number) + .unwrap_or_else(Hash256::zero); + let short = root + .as_slice() + .iter() + .take(4) + .map(|byte| format!("{byte:02x}")) + .collect::(); + let filename = format!("{}-{era_number:05}-{short}.era", self.network_name); + self.dir.join(filename) + } + + // era_file_name_root for file naming: + // short-era-root is the first 4 bytes of the last historical root in the last state in the + // era file, lower-case hex-encoded (8 characters), except the genesis era which instead + // uses the genesis_validators_root field from the genesis state. + // - The root is available as state.historical_roots[era - 1] except for genesis, which is + // state.genesis_validators_root + // - Post-Capella, the root must be computed from + // `state.historical_summaries[era - state.historical_roots.len - 1]` + fn era_file_name_root(&self, era_number: u64) -> Option { + if era_number == 0 { + return Some(self.genesis_validators_root); + } + let index = era_number.saturating_sub(1) as usize; + if let Some(root) = self.historical_roots.get(index) { + return Some(*root); + } + let summary_index = index.saturating_sub(self.historical_roots.len()); + self.historical_summaries + .get(summary_index) + .map(|summary| summary.tree_hash_root()) + } +} + +fn read_era_state( + path: &Path, + network_name: &str, + spec: &ChainSpec, +) -> Result, String> { + let file = File::open(path).map_err(|error| format!("failed to open era file: {error}"))?; + let era_file = EraReader::new(file) + .read_and_assemble(network_name.to_string()) + .map_err(|error| format!("failed to parse era file: {error:?}"))?; + decode_state::(era_file.group.era_state, spec) +} + +fn era_root_from_state( + state: &BeaconState, + era_number: u64, +) -> Result { + if era_number == 0 { + return Ok(state.genesis_validators_root()); + } + let index = era_number + .checked_sub(1) + .ok_or_else(|| "invalid era number".to_string())? as usize; + if let Some(root) = state.historical_roots().get(index) { + return Ok(*root); + } + if let Ok(summaries) = state.historical_summaries() { + let summary_index = index.saturating_sub(state.historical_roots().len()); + let summary = summaries + .get(summary_index) + .ok_or_else(|| "missing historical summary".to_string())?; + return Ok(summary.tree_hash_root()); + } + Err(format!("missing historical root for era {era_number}")) +} + +fn write_block_root_index_for_era, Cold: ItemStore>( + store: &HotColdDB, + state: &BeaconState, + era_number: u64, +) -> Result<(), String> { + let end_slot = state.slot(); + let slots_per_historical_root = E::slots_per_historical_root() as u64; + let expected_end_slot = Slot::new(era_number * slots_per_historical_root); + if end_slot != expected_end_slot { + return Err(format!( + "era state slot mismatch: expected {expected_end_slot}, got {end_slot}" + )); + } + + let start_slot = end_slot.saturating_sub(slots_per_historical_root); + + let ops = (start_slot.as_u64()..end_slot.as_u64()) + .map(|slot_u64| { + let slot = Slot::new(slot_u64); + let block_root = state + .get_block_root(slot) + .map_err(|error| format!("failed to read block root {slot}: {error:?}"))?; + // TODO(era): Should we write BeaconBlockRoots for missed slots? + Ok(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconBlockRoots, + slot_u64.to_be_bytes().to_vec(), + block_root.as_slice().to_vec(), + )) + }) + .collect::, String>>()?; + + store + .cold_db + .do_atomically(ops) + .map_err(|error| format!("failed to store block root index: {error:?}"))?; + + Ok(()) +} + +fn list_era_files(dir: &Path) -> Result, String> { + let entries = fs::read_dir(dir).map_err(|error| format!("failed to read era dir: {error}"))?; + let mut era_files = Vec::new(); + + for entry in entries { + let entry = entry.map_err(|error| format!("failed to read era entry: {error}"))?; + let path = entry.path(); + let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { + continue; + }; + + if !file_name.ends_with(".era") { + continue; + } + + let Some((prefix, _hash_part)) = file_name.rsplit_once('-') else { + continue; + }; + let Some((_network_name, era_part)) = prefix.rsplit_once('-') else { + continue; + }; + let Some(era_number) = era_part.parse().ok() else { + continue; + }; + + era_files.push((era_number, path)); + } + + if era_files.is_empty() { + warn!(?dir, "Era files directory is empty"); + } + + Ok(era_files) +} diff --git a/beacon_node/beacon_chain/src/era/mod.rs b/beacon_node/beacon_chain/src/era/mod.rs new file mode 100644 index 0000000000..ee326261d7 --- /dev/null +++ b/beacon_node/beacon_chain/src/era/mod.rs @@ -0,0 +1,5 @@ +pub mod consumer; +pub mod producer; + +#[cfg(test)] +mod tests; diff --git a/beacon_node/beacon_chain/src/era/producer.rs b/beacon_node/beacon_chain/src/era/producer.rs new file mode 100644 index 0000000000..a5695c59f5 --- /dev/null +++ b/beacon_node/beacon_chain/src/era/producer.rs @@ -0,0 +1,323 @@ +use rand::random; +use reth_era::common::file_ops::{EraFileFormat, EraFileId, StreamWriter}; +use reth_era::era::file::{EraFile, EraWriter}; +use reth_era::era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock}; +use reth_era::era::types::group::{EraGroup, EraId, SlotIndex}; +use ssz::Encode; +use std::fs::{self, File, OpenOptions}; +use std::path::Path; +use store::{HotColdDB, ItemStore}; +use tracing::{error, info}; +use tree_hash::TreeHash; +use types::{BeaconState, EthSpec, Slot}; + +fn era_file_exists(dir: &Path, id: &EraId) -> bool { + dir.join(id.to_file_name()).exists() +} + +fn era_file_exists_for_number(dir: &Path, network_name: &str, era_number: u64) -> bool { + let prefix = format!("{}-{:05}-", network_name, era_number); + let Ok(entries) = fs::read_dir(dir) else { + return false; + }; + + for entry in entries.flatten() { + let file_name = entry.file_name(); + let Some(name) = file_name.to_str() else { + continue; + }; + if name.starts_with(&prefix) && name.ends_with(".era") { + return true; + } + } + false +} + +#[allow(dead_code)] +pub(crate) fn maybe_produce_reconstruction_eras< + E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, +>( + db: &HotColdDB, + output_dir: &Path, +) { + let anchor = db.get_anchor_info(); + let max_era = anchor.state_lower_limit.as_u64() / E::slots_per_historical_root() as u64; + + for era_number in 0..=max_era { + if let Err(error) = create_era_file(db, era_number, output_dir) { + error!( + ?error, + era_number, "Era producer failed during reconstruction" + ); + break; + } + } +} + +#[allow(dead_code)] +pub(crate) fn maybe_produce_finalization_era, Cold: ItemStore>( + db: &HotColdDB, + output_dir: &Path, + finalized_slot: Slot, +) { + // This is the oldest slot for which we have a state and blocks available + let anchor_slot = db.get_anchor_info().anchor_slot; + // And finalized_slot is the most recent for which we have finalized state and blocks available + + // We can produce an era file for era_number if + // - anchor_slot <= start_slot(era_number) AND + // - finalized_slot >= end_slot(era_number) + let slots_per_hr = E::slots_per_historical_root() as u64; + let lowest_era_file = anchor_slot.as_u64() / slots_per_hr; + let Some(max_era_file) = ((finalized_slot.as_u64() + 1) / slots_per_hr).checked_sub(1) else { + return; + }; + for era_number in lowest_era_file..=max_era_file { + if let Err(error) = create_era_file(db, era_number, output_dir) { + error!( + ?error, + era_number, "Era producer failed during finalization" + ); + break; + } + } +} + +pub fn create_era_file, Cold: ItemStore>( + db: &HotColdDB, + era_number: u64, + output_dir: &Path, +) -> Result<(), String> { + let network_name = db + .spec + .config_name + .clone() + .unwrap_or_else(|| "unknown".to_string()); + if era_file_exists_for_number(output_dir, &network_name, era_number) { + return Ok(()); + } + + let end_slot = Slot::new(era_number * E::slots_per_historical_root() as u64); + + let mut state = db + .load_cold_state_by_slot(end_slot) + .map_err(|error| format!("failed to load era state: {error:?}"))?; + + if state.slot() != end_slot { + return Err(format!( + "era state slot mismatch: expected {}, got {}", + end_slot, + state.slot() + )); + } + + let group = build_era_group(db, &mut state, era_number)?; + let file_id = era_file_id::(&network_name, era_number, &mut state)?; + let file = EraFile::new(group, file_id); + + fs::create_dir_all(output_dir) + .map_err(|error| format!("failed to create era files dir: {error}"))?; + + if era_file_exists(output_dir, file.id()) { + return Ok(()); + } + + write_era_file_atomic(output_dir, &file)?; + + info!( + era_number, + file = %file.id().to_file_name(), + "Wrote era file" + ); + + Ok(()) +} + +fn build_era_group, Cold: ItemStore>( + db: &HotColdDB, + state: &mut BeaconState, + era_number: u64, +) -> Result { + // Era file 0 goes from slot 0 to 0, genesis state only + let start_slot = + Slot::new(era_number.saturating_sub(1) * E::slots_per_historical_root() as u64); + let end_slot = Slot::new(era_number * E::slots_per_historical_root() as u64); + + let compressed_state = CompressedBeaconState::from_ssz(&state.as_ssz_bytes()) + .map_err(|error| format!("failed to compress state: {error:?}"))?; + + // Each entry has an 8-byte header; the version record is header-only. + let mut offset: i64 = 8; + let mut blocks: Vec = Vec::new(); + let mut block_data_starts: Vec<(Slot, i64)> = Vec::new(); + + // The era file number 0 contains the genesis state and nothing else + if era_number > 0 { + for slot_u64 in start_slot.as_u64()..end_slot.as_u64() { + let slot = Slot::new(slot_u64); + let block_root = state + .get_block_root(slot) + .map_err(|error| format!("failed to read block root {slot}: {error:?}"))?; + + // Skip duplicate blocks (same root as previous slot), but only within this ERA + if slot_u64 > start_slot.as_u64() + && let Ok(prev_root) = state.get_block_root(Slot::new(slot_u64 - 1)) + && prev_root == block_root + { + continue; + } + + let block = db + .get_full_block(block_root) + .map_err(|error| format!("failed to load block: {error:?}"))? + .ok_or_else(|| format!("missing block for root {block_root:?}"))?; + + let compressed = CompressedSignedBeaconBlock::from_ssz(&block.as_ssz_bytes()) + .map_err(|error| format!("failed to compress block: {error:?}"))?; + + let data_len = compressed.data.len() as i64; + let data_start = offset + 8; + blocks.push(compressed); + block_data_starts.push((slot, data_start)); + offset += 8 + data_len; + } + } + + let state_data_len = compressed_state.data.len() as i64; + // Data starts after the 8-byte header. + let state_data_start = offset + 8; + offset += 8 + state_data_len; + + let block_index_start = offset; + let slot_count = E::slots_per_historical_root(); + // SlotIndex layout: starting_slot (8) + offsets (slot_count * 8) + count (8). + let block_index_len = 8 + slot_count as i64 * 8 + 8; + if era_number > 0 { + offset += 8 + block_index_len; + } + + let state_index_start = offset; + // Offset is relative to the start of the slot index data (after the 8-byte header) + let state_offset = state_data_start - (state_index_start + 8); + let state_slot_index = SlotIndex::new(end_slot.as_u64(), vec![state_offset]); + + let group = if era_number > 0 { + let mut offsets = vec![0i64; slot_count]; + for (slot, data_start) in &block_data_starts { + let slot_index = slot + .as_u64() + .checked_sub(start_slot.as_u64()) + .ok_or_else(|| "slot underflow while building block index".to_string())? + as usize; + offsets[slot_index] = *data_start - block_index_start; + } + let block_index = SlotIndex::new(start_slot.as_u64(), offsets); + EraGroup::with_block_index(blocks, compressed_state, block_index, state_slot_index) + } else { + EraGroup::new(blocks, compressed_state, state_slot_index) + }; + Ok(group) +} + +fn short_historical_root( + state: &mut BeaconState, + era_number: u64, +) -> Result<[u8; 4], String> { + let root = if era_number == 0 { + state.genesis_validators_root() + } else { + let era_index = era_number + .checked_sub(1) + .ok_or_else(|| "era index underflow".to_string())?; + let roots_len = state.historical_roots_mut().len(); + if era_index < roots_len as u64 { + *state + .historical_roots_mut() + .get(era_index as usize) + .ok_or_else(|| "historical root missing".to_string())? + } else { + let summary_index = era_index + .checked_sub(roots_len as u64) + .ok_or_else(|| "historical summary index underflow".to_string())?; + let summaries = state + .historical_summaries_mut() + .map_err(|error| format!("failed to access historical summaries: {error:?}"))?; + let summary = summaries + .get(summary_index as usize) + .ok_or_else(|| "historical summary missing".to_string())?; + summary.tree_hash_root() + } + }; + + let mut short_hash = [0u8; 4]; + short_hash.copy_from_slice(&root.as_slice()[..4]); + Ok(short_hash) +} + +/// Write an era file atomically using a temp file + rename. +/// +/// If the process crashes mid-write, only the temp file is left behind; the final file is +/// created via rename, so it is either complete or absent. The era file existence check only +/// considers `.era` files, so partial temp files are ignored safely. +fn write_era_file_atomic(output_dir: &Path, file: &EraFile) -> Result<(), String> { + let filename = file.id().to_file_name(); + let final_path = output_dir.join(&filename); + if final_path.exists() { + return Ok(()); + } + + // Create a unique temp file and write the full era contents into it. + let tmp_name = format!("{filename}.tmp-{:016x}", random::()); + let tmp_path = output_dir.join(tmp_name); + let mut file_handle = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tmp_path) + .map_err(|error| format!("failed to create temp file: {error}"))?; + { + let mut writer = EraWriter::new(&mut file_handle); + writer + .write_file(file) + .map_err(|error| format!("failed to write era file: {error:?}"))?; + } + file_handle + .sync_all() + .map_err(|error| format!("failed to fsync era temp file: {error}"))?; + + // Atomically publish; if another writer won, clean up and exit. + if let Err(error) = fs::rename(&tmp_path, &final_path) { + if error.kind() == std::io::ErrorKind::AlreadyExists && final_path.exists() { + let _ = fs::remove_file(&tmp_path); + return Ok(()); + } + return Err(format!("failed to rename era temp file: {error}")); + } + + // Best-effort directory sync to make the rename durable. + if let Ok(dir_handle) = File::open(output_dir) { + let _ = dir_handle.sync_all(); + } + + Ok(()) +} + +fn era_file_id( + network_name: &str, + era_number: u64, + state: &mut BeaconState, +) -> Result { + // reth_era uses hardcoded SLOTS_PER_HISTORICAL_ROOT=8192 to compute era number from start_slot. + // To get the correct filename era number, we pass era_number * 8192 as the start_slot. + const RETH_SLOTS_PER_HISTORICAL_ROOT: u64 = 8192; + let reth_start_slot = era_number * RETH_SLOTS_PER_HISTORICAL_ROOT; + + let slot_count = if era_number == 0 { + 0 + } else { + E::slots_per_historical_root() as u32 + }; + let short_hash = short_historical_root(state, era_number)?; + Ok(EraId::new(network_name, reth_start_slot, slot_count).with_hash(short_hash)) +} diff --git a/beacon_node/beacon_chain/src/era/tests.rs b/beacon_node/beacon_chain/src/era/tests.rs new file mode 100644 index 0000000000..23b633cc97 --- /dev/null +++ b/beacon_node/beacon_chain/src/era/tests.rs @@ -0,0 +1,445 @@ +/// ERA file consumer + producer tests using minimal preset test vectors. +/// +/// Test vectors: 13 ERA files from a Nimbus minimal testnet. +/// - Electra from genesis, Fulu at epoch 100000 +/// - SLOTS_PER_HISTORICAL_ROOT = 64 (one ERA = 64 slots = 8 epochs) +/// - 13 ERA files covering 832 slots, 767 blocks, 1024 validators +use super::consumer::EraFileDir; +use reth_era::common::file_ops::StreamReader; +use std::path::PathBuf; +use std::sync::Arc; +use store::{DBColumn, HotColdDB, KeyValueStore, StoreConfig}; +use types::{BeaconState, ChainSpec, Config, EthSpec, Hash256, MinimalEthSpec}; + +fn test_vectors_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests") + .join("era_test_vectors") +} + +fn load_test_spec() -> ChainSpec { + let config_str = + std::fs::read_to_string(test_vectors_dir().join("config.yaml")).expect("read config.yaml"); + let config: Config = serde_yaml::from_str(&config_str).expect("parse config"); + config + .apply_to_chain_spec::(&ChainSpec::minimal()) + .expect("apply config") +} + +fn load_genesis_state(spec: &ChainSpec) -> BeaconState { + // Extract genesis state from ERA 0 file + let era_dir = test_vectors_dir().join("era"); + let era0_path = std::fs::read_dir(&era_dir) + .expect("read era dir") + .filter_map(|e| e.ok()) + .find(|e| e.file_name().to_string_lossy().contains("-00000-")) + .expect("ERA 0 file must exist"); + let file = std::fs::File::open(era0_path.path()).expect("open ERA 0"); + let era = reth_era::era::file::EraReader::new(file) + .read_and_assemble("minimal".to_string()) + .expect("parse ERA 0"); + let state_bytes = era + .group + .era_state + .decompress() + .expect("decompress ERA 0 state"); + BeaconState::from_ssz_bytes(&state_bytes, spec).expect("decode genesis state from ERA 0") +} + +type TestStore = HotColdDB< + MinimalEthSpec, + store::MemoryStore, + store::MemoryStore, +>; + +/// Import all ERA files into a fresh ephemeral store. +fn import_all_era_files() -> (TestStore, ChainSpec, u64) { + let spec = load_test_spec(); + let era_dir_path = test_vectors_dir().join("era"); + let era_dir = EraFileDir::new::(&era_dir_path, &spec).expect("open ERA dir"); + let max_era = era_dir.max_era(); + + let store = HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())) + .expect("create store"); + + let mut genesis_state = load_genesis_state(&spec); + let root = genesis_state.canonical_root().expect("hash genesis"); + let mut ops = vec![]; + store + .store_cold_state(&root, &genesis_state, &mut ops) + .expect("build ops"); + store.cold_db.do_atomically(ops).expect("write genesis"); + + for era in 1..=max_era { + era_dir + .import_era_file(&store, era, &spec, None) + .unwrap_or_else(|e| panic!("import ERA {era}: {e}")); + } + + (store, spec, max_era) +} + +/// Create ephemeral store with genesis state only. +fn empty_store(spec: &ChainSpec) -> TestStore { + let store = + HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())).expect("store"); + let mut genesis = load_genesis_state(spec); + let root = genesis.canonical_root().expect("hash"); + let mut ops = vec![]; + store + .store_cold_state(&root, &genesis, &mut ops) + .expect("ops"); + store.cold_db.do_atomically(ops).expect("write"); + store +} + +/// Copy ERA files to temp dir, replacing one with a corrupt version. +fn era_dir_with_corrupt(corrupt_file: &str, target_pattern: &str) -> tempfile::TempDir { + let tmp = tempfile::TempDir::new().expect("tmp"); + let src = test_vectors_dir(); + let dst = tmp.path().join("era"); + std::fs::create_dir_all(&dst).expect("mkdir"); + + for entry in std::fs::read_dir(src.join("era")).expect("readdir") { + let entry = entry.expect("entry"); + let name = entry.file_name().to_string_lossy().to_string(); + if name.contains(target_pattern) { + std::fs::copy(src.join("corrupt").join(corrupt_file), dst.join(&name)) + .expect("copy corrupt"); + } else { + std::fs::copy(entry.path(), dst.join(&name)).expect("copy"); + } + } + tmp +} + +// ============================================================================= +// CONSUMER TEST +// ============================================================================= + +/// Import all ERA files, verify block hash chain and state roots. +#[test] +fn era_consumer_imports_and_verifies() { + let (store, spec, max_era) = import_all_era_files(); + let slots_per_era = MinimalEthSpec::slots_per_historical_root() as u64; + let max_slot = max_era * slots_per_era; + + // Collect all blocks by reading block root index, then fetching from store + let mut blocks_by_slot = std::collections::BTreeMap::new(); + let mut seen_roots = std::collections::HashSet::new(); + + for slot in 0..max_slot { + let key = slot.to_be_bytes().to_vec(); + if let Some(root_bytes) = store + .cold_db + .get_bytes(DBColumn::BeaconBlockRoots, &key) + .expect("read index") + { + let block_root = Hash256::from_slice(&root_bytes); + if seen_roots.insert(block_root) { + let block = store + .get_full_block(&block_root) + .expect("query") + .unwrap_or_else(|| panic!("block missing at slot {slot}")); + assert_eq!( + block.canonical_root(), + block_root, + "block root mismatch at slot {slot}" + ); + blocks_by_slot.insert(slot, block); + } + } + } + + assert!( + blocks_by_slot.len() > 700, + "expected >700 blocks, got {}", + blocks_by_slot.len() + ); + + // Verify parent_root chain: each block's parent_root must equal the previous block's root + let slots: Vec<_> = blocks_by_slot.keys().copied().collect(); + for i in 1..slots.len() { + let block = &blocks_by_slot[&slots[i]]; + let prev_block = &blocks_by_slot[&slots[i - 1]]; + assert_eq!( + block.message().parent_root(), + prev_block.canonical_root(), + "broken hash chain at slot {}: parent_root doesn't match previous block root", + slots[i] + ); + } + + // Verify boundary states match ERA file state roots + let era_dir_path = test_vectors_dir().join("era"); + let mut era_files: Vec<_> = std::fs::read_dir(&era_dir_path) + .expect("readdir") + .filter_map(|e| e.ok()) + .filter(|e| e.file_name().to_string_lossy().ends_with(".era")) + .collect(); + era_files.sort_by_key(|e| e.file_name()); + + for entry in &era_files { + let file = std::fs::File::open(entry.path()).expect("open"); + let era = reth_era::era::file::EraReader::new(file) + .read_and_assemble("minimal".to_string()) + .expect("parse"); + let state_bytes = era.group.era_state.decompress().expect("decompress"); + let mut era_state: BeaconState = + BeaconState::from_ssz_bytes(&state_bytes, &spec).expect("decode"); + let expected_root = era_state.canonical_root().expect("root"); + let slot = era_state.slot(); + + // Load state from store and verify root matches + let mut stored_state = store.load_cold_state_by_slot(slot).expect("load state"); + assert_eq!( + stored_state.slot(), + slot, + "stored state slot mismatch at slot {slot}" + ); + let stored_root = stored_state.canonical_root().expect("root"); + assert_eq!( + stored_root, expected_root, + "state root mismatch at slot {slot}" + ); + } +} + +// ============================================================================= +// PRODUCER TEST — byte-identical output +// ============================================================================= + +#[test] +fn era_producer_output_is_byte_identical() { + let (store, _spec, max_era) = import_all_era_files(); + let output = PathBuf::from("/tmp/era_producer_test_output"); + let _ = std::fs::remove_dir_all(&output); + std::fs::create_dir_all(&output).expect("mkdir"); + + for era in 0..=max_era { + super::producer::create_era_file(&store, era, &output) + .unwrap_or_else(|e| panic!("produce ERA {era}: {e}")); + } + + let mut originals: Vec<_> = std::fs::read_dir(test_vectors_dir().join("era")) + .expect("readdir") + .filter_map(|e| e.ok()) + .filter(|e| e.file_name().to_string_lossy().ends_with(".era")) + .collect(); + originals.sort_by_key(|e| e.file_name()); + + let mut produced: Vec<_> = std::fs::read_dir(&output) + .expect("readdir") + .filter_map(|e| e.ok()) + .filter(|e| e.file_name().to_string_lossy().ends_with(".era")) + .collect(); + produced.sort_by_key(|e| e.file_name()); + + assert_eq!(originals.len(), produced.len(), "file count mismatch"); + + for (orig, prod) in originals.iter().zip(produced.iter()) { + assert_eq!( + std::fs::read(orig.path()).expect("read"), + std::fs::read(prod.path()).expect("read"), + "ERA mismatch: {:?}", + orig.file_name() + ); + } +} + +// ============================================================================= +// CORRUPTION TESTS — verify specific error messages +// ============================================================================= + +fn assert_import_fails( + corrupt_file: &str, + target_pattern: &str, + target_era: u64, + expected_err: &str, +) { + let tmp = era_dir_with_corrupt(corrupt_file, target_pattern); + let spec = load_test_spec(); + let era_dir = EraFileDir::new::(&tmp.path().join("era"), &spec) + .expect("init should succeed"); + let store = empty_store(&spec); + + for era in 0..target_era { + era_dir + .import_era_file(&store, era, &spec, None) + .unwrap_or_else(|e| panic!("ERA {era}: {e}")); + } + + let err = era_dir + .import_era_file(&store, target_era, &spec, None) + .unwrap_err(); + assert!( + err.contains(expected_err), + "expected \"{expected_err}\", got: {err}" + ); +} + +#[test] +fn era_rejects_corrupted_block_decompression() { + assert_import_fails("era1-corrupt-block.era", "-00001-", 1, "decompress"); +} + +#[test] +fn era_rejects_corrupted_genesis_state() { + assert_import_fails("era0-corrupt-state.era", "-00000-", 0, "decompress"); +} + +#[test] +fn era_rejects_corrupted_middle_state() { + assert_import_fails("era5-corrupt-state.era", "-00005-", 5, "decompress"); +} + +#[test] +fn era_rejects_corrupted_reference_state() { + let tmp = era_dir_with_corrupt("era12-corrupt-state.era", "-00012-"); + let spec = load_test_spec(); + match EraFileDir::new::(&tmp.path().join("era"), &spec) { + Ok(_) => panic!("should fail with corrupted reference state"), + Err(err) => assert!(err.contains("decompress"), "expected decompress: {err}"), + } +} + +#[test] +fn era_rejects_wrong_era_content() { + assert_import_fails( + "era3-wrong-content.era", + "-00003-", + 3, + "era state slot mismatch", + ); +} + +#[test] +fn era_rejects_wrong_era_root() { + assert_import_fails("era0-wrong-root.era", "-00000-", 0, "era root mismatch"); +} + +#[test] +fn era_rejects_corrupt_block_summary() { + assert_import_fails( + "era8-corrupt-block-summary.era", + "-00008-", + 8, + "block summary root post-capella mismatch", + ); +} + +#[test] +fn era_rejects_wrong_block_root() { + assert_import_fails( + "era2-wrong-block-root.era", + "-00002-", + 2, + "block root mismatch", + ); +} + +/// Mutated balance in ERA 3 state → state root doesn't match trusted root. +/// Without a trusted root, the consumer can't detect this (historical_summaries only +/// commit to block_roots/state_roots vectors, not full state content). +/// The trusted state root feature catches it. +#[test] +fn era_rejects_mutated_state_with_trusted_root() { + let tmp = era_dir_with_corrupt("era3-wrong-state-root.era", "-00003-"); + let spec = load_test_spec(); + let era_dir = EraFileDir::new::(&tmp.path().join("era"), &spec) + .expect("init should succeed"); + let store = empty_store(&spec); + + for era in 0..3 { + era_dir + .import_era_file(&store, era, &spec, None) + .unwrap_or_else(|e| panic!("ERA {era}: {e}")); + } + + // Get the CORRECT state root from the original ERA 3 file + let orig_era3 = std::fs::read_dir(test_vectors_dir().join("era")) + .expect("readdir") + .filter_map(|e| e.ok()) + .find(|e| e.file_name().to_string_lossy().contains("-00003-")) + .expect("ERA 3"); + let file = std::fs::File::open(orig_era3.path()).expect("open"); + let era = reth_era::era::file::EraReader::new(file) + .read_and_assemble("minimal".to_string()) + .expect("parse"); + let state_bytes = era.group.era_state.decompress().expect("decompress"); + let mut state: BeaconState = + BeaconState::from_ssz_bytes(&state_bytes, &spec).expect("decode"); + let correct_root = state.canonical_root().expect("root"); + let slot = state.slot(); + + // Import mutated ERA 3 with trusted root → should fail because balance was changed + let err = era_dir + .import_era_file(&store, 3, &spec, Some((correct_root, slot))) + .unwrap_err(); + assert!( + err.contains("trusted state root mismatch"), + "expected trusted state root mismatch: {err}" + ); +} + +// ============================================================================= +// TRUSTED STATE ROOT VERIFICATION +// ============================================================================= + +#[test] +fn era_rejects_wrong_trusted_state_root() { + let spec = load_test_spec(); + let store = empty_store(&spec); + let era_dir_path = test_vectors_dir().join("era"); + let era_dir = EraFileDir::new::(&era_dir_path, &spec).expect("open"); + + for era in 0..=2 { + era_dir + .import_era_file(&store, era, &spec, None) + .unwrap_or_else(|e| panic!("ERA {era}: {e}")); + } + + // Get correct state root for ERA 3 + let era3_file = std::fs::read_dir(&era_dir_path) + .expect("readdir") + .filter_map(|e| e.ok()) + .find(|e| e.file_name().to_string_lossy().contains("-00003-")) + .expect("ERA 3"); + + let file = std::fs::File::open(era3_file.path()).expect("open"); + let era = reth_era::era::file::EraReader::new(file) + .read_and_assemble("minimal".to_string()) + .expect("parse"); + let state_bytes = era.group.era_state.decompress().expect("decompress"); + let mut state: BeaconState = + BeaconState::from_ssz_bytes(&state_bytes, &spec).expect("decode"); + let correct_root = state.canonical_root().expect("root"); + let slot = state.slot(); + + // Correct root passes + era_dir + .import_era_file(&store, 3, &spec, Some((correct_root, slot))) + .expect("correct root should pass"); + + // Wrong root fails + let wrong_root = { + let mut bytes: [u8; 32] = correct_root.into(); + bytes[0] ^= 0x01; + Hash256::from(bytes) + }; + + let store2 = empty_store(&spec); + for era in 0..=2 { + era_dir + .import_era_file(&store2, era, &spec, None) + .unwrap_or_else(|e| panic!("ERA {era}: {e}")); + } + + let err = era_dir + .import_era_file(&store2, 3, &spec, Some((wrong_root, slot))) + .unwrap_err(); + assert!( + err.contains("trusted state root mismatch"), + "expected trusted state root mismatch: {err}" + ); +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index e1a190ffb3..62fa18060e 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -21,6 +21,7 @@ pub mod custody_context; pub mod data_availability_checker; pub mod data_column_verification; mod early_attester_cache; +pub mod era; mod errors; pub mod events; pub mod execution_payload; diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/README.md b/beacon_node/beacon_chain/tests/era_test_vectors/README.md new file mode 100644 index 0000000000..400a6c5e2e --- /dev/null +++ b/beacon_node/beacon_chain/tests/era_test_vectors/README.md @@ -0,0 +1,37 @@ +# ERA File Test Vectors + +Minimal preset test vectors for ERA file import/export testing. + +## Network Configuration + +- **Preset**: minimal (SLOTS_PER_EPOCH=8, SLOTS_PER_HISTORICAL_ROOT=64) +- **One ERA file** = 64 slots = 8 epochs +- **Validators**: 1024 +- **Fork schedule**: All forks active from genesis (Electra), Fulu at epoch 100000 + +## Contents + +- `config.yaml` — Network configuration (CL fork schedule + parameters) +- `genesis.ssz` — Genesis state (SSZ encoded) +- `era/` — 13 ERA files (minimal-00000 through minimal-00012) + - 832 slots total (epochs 0-103) + - ~2.4MB compressed + +## Generation + +Generated using Nimbus `launch_local_testnet.sh` with `--preset minimal --nodes 2 --stop-at-epoch 100 --run-geth --run-spamoor`, then exported via `ncli_db exportEra`. + +ERA files contain real blocks with execution payloads (transactions generated by spamoor). + +## Test Coverage + +### Consumer Tests (4 tests) +- `era_consumer_imports_all_files` — Imports all 13 ERA files into a fresh store, verifies 768 block root index entries +- `era_consumer_blocks_are_readable` — Verifies all 767 unique blocks are loadable from the store +- `era_consumer_genesis_state_intact` — Verifies genesis state with 1024 validators +- `era_files_are_parseable` — Verifies all ERA files can be parsed by reth_era library + +### Producer Test (1 test) +- `era_producer_generates_identical_files` — Re-exports ERA files from imported data and verifies byte-for-byte match with original Nimbus-generated files + +All tests passing ✅ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/config.yaml b/beacon_node/beacon_chain/tests/era_test_vectors/config.yaml new file mode 100644 index 0000000000..a1895239ee --- /dev/null +++ b/beacon_node/beacon_chain/tests/era_test_vectors/config.yaml @@ -0,0 +1,186 @@ +# This file should contain the origin run-time config for the minimal +# network [1] without all properties overriden in the local network +# simulation. We use to generate a full run-time config as required +# by third-party binaries, such as Lighthouse and Web3Signer. +# +# [1]: https://raw.githubusercontent.com/ethereum/consensus-specs/dev/configs/minimal.yaml + +# Minimal config + +# Extends the minimal preset +# (overriden in launch_local_testnet.sh) PRESET_BASE: 'minimal' + +# Free-form short name of the network that this configuration applies to - known +# canonical network names include: +# * 'mainnet' - there can be only one +# * 'prater' - testnet +# Must match the regex: [a-z0-9\-] +CONFIG_NAME: 'minimal' + +# Transition +# --------------------------------------------------------------- +# 2**256-2**10 for testing minimal network +# (overriden in launch_local_testnet.sh) TERMINAL_TOTAL_DIFFICULTY: 115792089237316195423570985008687907853269984665640564039457584007913129638912 +# By default, don't use these params +TERMINAL_BLOCK_HASH: 0x0000000000000000000000000000000000000000000000000000000000000000 +TERMINAL_BLOCK_HASH_ACTIVATION_EPOCH: 18446744073709551615 + + + +# Genesis +# --------------------------------------------------------------- +# [customized] +# (overriden in launch_local_testnet.sh) MIN_GENESIS_ACTIVE_VALIDATOR_COUNT: 64 +# Jan 3, 2020 +# (overriden in launch_local_testnet.sh) MIN_GENESIS_TIME: 1578009600 +# Highest byte set to 0x01 to avoid collisions with mainnet versioning +GENESIS_FORK_VERSION: 0x00000001 +# [customized] Faster to spin up testnets, but does not give validator reasonable warning time for genesis +# (overriden in launch_local_testnet.sh) GENESIS_DELAY: 300 + + +# Forking +# --------------------------------------------------------------- +# Values provided for illustrative purposes. +# Individual tests/testnets may set different values. + +# Altair +ALTAIR_FORK_VERSION: 0x01000001 +# (overriden in launch_local_testnet.sh) ALTAIR_FORK_EPOCH: 18446744073709551615 +# Bellatrix +BELLATRIX_FORK_VERSION: 0x02000001 +# (overriden in launch_local_testnet.sh) BELLATRIX_FORK_EPOCH: 18446744073709551615 +# Capella +CAPELLA_FORK_VERSION: 0x03000001 +# (overriden in launch_local_testnet.sh) CAPELLA_FORK_EPOCH: 18446744073709551615 +# Deneb +DENEB_FORK_VERSION: 0x04000001 +# (overriden in launch_local_testnet.sh) DENEB_FORK_EPOCH: 18446744073709551615 +# Electra +ELECTRA_FORK_VERSION: 0x05000001 +# (overriden in launch_local_testnet.sh) ELECTRA_FORK_EPOCH: 18446744073709551615 +# Fulu +FULU_FORK_VERSION: 0x06000001 +# (overriden in launch_local_testnet.sh) FULU_FORK_EPOCH: 18446744073709551615 + +# Time parameters +# --------------------------------------------------------------- +# [customized] Faster for testing purposes +SECONDS_PER_SLOT: 6 +# 14 (estimate from Eth1 mainnet) +SECONDS_PER_ETH1_BLOCK: 14 +# 2**8 (= 256) epochs +MIN_VALIDATOR_WITHDRAWABILITY_DELAY: 256 +# [customized] higher frequency of committee turnover and faster time to acceptable voluntary exit +SHARD_COMMITTEE_PERIOD: 64 +# [customized] process deposits more quickly, but insecure +# (overriden in launch_local_testnet.sh) ETH1_FOLLOW_DISTANCE: 16 + + +# Validator cycle +# --------------------------------------------------------------- +# 2**2 (= 4) +INACTIVITY_SCORE_BIAS: 4 +# 2**4 (= 16) +INACTIVITY_SCORE_RECOVERY_RATE: 16 +# 2**4 * 10**9 (= 16,000,000,000) Gwei +EJECTION_BALANCE: 16000000000 +# [customized] more easily demonstrate the difference between this value and the activation churn limit +MIN_PER_EPOCH_CHURN_LIMIT: 2 +# [customized] scale queue churn at much lower validator counts for testing +CHURN_LIMIT_QUOTIENT: 32 +# [New in Deneb:EIP7514] [customized] +MAX_PER_EPOCH_ACTIVATION_CHURN_LIMIT: 4 + + +# Fork choice +# --------------------------------------------------------------- +# 40% +PROPOSER_SCORE_BOOST: 40 +# 20% +REORG_HEAD_WEIGHT_THRESHOLD: 20 +# 160% +REORG_PARENT_WEIGHT_THRESHOLD: 160 +# `2` epochs +REORG_MAX_EPOCHS_SINCE_FINALIZATION: 2 + + +# Deposit contract +# --------------------------------------------------------------- +# Ethereum Goerli testnet +DEPOSIT_CHAIN_ID: 5 +DEPOSIT_NETWORK_ID: 5 +# Configured on a per testnet basis +# (overriden in launch_local_testnet.sh) DEPOSIT_CONTRACT_ADDRESS: 0x1234567890123456789012345678901234567890 + + +# Networking +# --------------------------------------------------------------- +# `10 * 2**20` (= 10485760, 10 MiB) +MAX_PAYLOAD_SIZE: 10485760 +# `2**10` (= 1024) +MAX_REQUEST_BLOCKS: 1024 +# `2**8` (= 256) +EPOCHS_PER_SUBNET_SUBSCRIPTION: 256 +# [customized] `MIN_VALIDATOR_WITHDRAWABILITY_DELAY + CHURN_LIMIT_QUOTIENT // 2` (= 272) +MIN_EPOCHS_FOR_BLOCK_REQUESTS: 272 +ATTESTATION_PROPAGATION_SLOT_RANGE: 32 +# 500ms +MAXIMUM_GOSSIP_CLOCK_DISPARITY: 500 +MESSAGE_DOMAIN_INVALID_SNAPPY: 0x00000000 +MESSAGE_DOMAIN_VALID_SNAPPY: 0x01000000 +# 2 subnets per node +SUBNETS_PER_NODE: 2 +# 2**8 (= 64) +ATTESTATION_SUBNET_COUNT: 64 +ATTESTATION_SUBNET_EXTRA_BITS: 0 +# ceillog2(ATTESTATION_SUBNET_COUNT) + ATTESTATION_SUBNET_EXTRA_BITS +ATTESTATION_SUBNET_PREFIX_BITS: 6 + +# Deneb +# `2**7` (=128) +MAX_REQUEST_BLOCKS_DENEB: 128 +# `2**12` (= 4096 epochs, ~18 days) +MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 4096 +# `6` +BLOB_SIDECAR_SUBNET_COUNT: 6 +## `uint64(6)` +MAX_BLOBS_PER_BLOCK: 6 +# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK +MAX_REQUEST_BLOB_SIDECARS: 768 + +# Electra +# [customized] 2**6 * 10**9 (= 64,000,000,000) +MIN_PER_EPOCH_CHURN_LIMIT_ELECTRA: 64000000000 +# [customized] 2**7 * 10**9 (= 128,000,000,000) +MAX_PER_EPOCH_ACTIVATION_EXIT_CHURN_LIMIT: 128000000000 +# `9` +BLOB_SIDECAR_SUBNET_COUNT_ELECTRA: 9 +# `uint64(9)` +MAX_BLOBS_PER_BLOCK_ELECTRA: 9 +# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA +MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152 + +# Fulu +NUMBER_OF_COLUMNS: 128 +NUMBER_OF_CUSTODY_GROUPS: 128 +DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 +MAX_REQUEST_DATA_COLUMN_SIDECARS: 16384 +SAMPLES_PER_SLOT: 8 +CUSTODY_REQUIREMENT: 4 +VALIDATOR_CUSTODY_REQUIREMENT: 8 +BALANCE_PER_ADDITIONAL_CUSTODY_GROUP: 32000000000 +MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS: 4096 +PRESET_BASE: minimal +MIN_GENESIS_ACTIVE_VALIDATOR_COUNT: 1024 +MIN_GENESIS_TIME: 0 +GENESIS_DELAY: 10 +DEPOSIT_CONTRACT_ADDRESS: 0x4242424242424242424242424242424242424242 +ETH1_FOLLOW_DISTANCE: 1 +ALTAIR_FORK_EPOCH: 0 +BELLATRIX_FORK_EPOCH: 0 +CAPELLA_FORK_EPOCH: 0 +DENEB_FORK_EPOCH: 0 +ELECTRA_FORK_EPOCH: 0 +FULU_FORK_EPOCH: 100000 +TERMINAL_TOTAL_DIFFICULTY: 0 diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era0-corrupt-state.era b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era0-corrupt-state.era new file mode 100644 index 0000000000..41b15550a1 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era0-corrupt-state.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era0-wrong-root.era b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era0-wrong-root.era new file mode 100644 index 0000000000..ec1aedafc7 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era0-wrong-root.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era1-corrupt-block.era b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era1-corrupt-block.era new file mode 100644 index 0000000000..fdb1aac68f Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era1-corrupt-block.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era12-corrupt-state.era b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era12-corrupt-state.era new file mode 100644 index 0000000000..f291dcad89 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era12-corrupt-state.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era2-wrong-block-root.era b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era2-wrong-block-root.era new file mode 100644 index 0000000000..0f28602486 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era2-wrong-block-root.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era3-wrong-content.era b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era3-wrong-content.era new file mode 100644 index 0000000000..472496b6e9 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era3-wrong-content.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era3-wrong-state-root.era b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era3-wrong-state-root.era new file mode 100644 index 0000000000..58eb414112 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era3-wrong-state-root.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era5-corrupt-state.era b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era5-corrupt-state.era new file mode 100644 index 0000000000..3b5bb34dec Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era5-corrupt-state.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era8-corrupt-block-summary.era b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era8-corrupt-block-summary.era new file mode 100644 index 0000000000..e6e0785481 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/corrupt/era8-corrupt-block-summary.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/create_corrupt_files.py b/beacon_node/beacon_chain/tests/era_test_vectors/create_corrupt_files.py new file mode 100644 index 0000000000..97e555444a --- /dev/null +++ b/beacon_node/beacon_chain/tests/era_test_vectors/create_corrupt_files.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +Create corrupted ERA files for testing ERA consumer error handling. + +This script generates specific corrupt ERA files by: +1. Parsing existing ERA files +2. Modifying specific parts (block data, state data) +3. Re-encoding with valid compression + +Requires: pip install python-snappy +""" +import os +import sys +import shutil +from pathlib import Path + +try: + import snappy +except ImportError: + print("ERROR: python-snappy not installed. Run: pip install python-snappy", file=sys.stderr) + sys.exit(1) + +SCRIPT_DIR = Path(__file__).parent +ERA_DIR = SCRIPT_DIR / "era" +CORRUPT_DIR = SCRIPT_DIR / "corrupt" + +def read_era_file(path): + """Read ERA file and return raw bytes.""" + with open(path, 'rb') as f: + return f.read() + +def find_era_file(pattern): + """Find ERA file matching pattern.""" + files = list(ERA_DIR.glob(f"minimal-{pattern}-*.era")) + if not files: + return None + return files[0] + +def corrupt_bytes_at_offset(data, offset, xor_pattern=0xFF): + """Corrupt bytes at specific offset by XOR.""" + result = bytearray(data) + result[offset] ^= xor_pattern + result[offset + 1] ^= xor_pattern + return bytes(result) + +def main(): + print("Creating corrupt ERA test files...\n") + CORRUPT_DIR.mkdir(exist_ok=True) + + # Test 1: ERA root mismatch - corrupt genesis_validators_root in ERA 0 + era0 = find_era_file("00000") + if era0: + data = read_era_file(era0) + # Corrupt bytes in the state section (after 16-byte header) + # The state is compressed, so corruption will propagate through state root + corrupt_data = corrupt_bytes_at_offset(data, 16 + 50) + output = CORRUPT_DIR / "era0-wrong-root.era" + with open(output, 'wb') as f: + f.write(corrupt_data) + print(f"✓ Created era0-wrong-root.era ({len(corrupt_data)} bytes)") + else: + print("⚠ ERA 0 file not found, skipping", file=sys.stderr) + + # Test 2: Block summary root post-Capella mismatch - corrupt block_roots + era8 = find_era_file("00008") + if era8: + data = read_era_file(era8) + # Corrupt state section (different offset than ERA 0) + corrupt_data = corrupt_bytes_at_offset(data, 16 + 100) + output = CORRUPT_DIR / "era8-corrupt-block-summary.era" + with open(output, 'wb') as f: + f.write(corrupt_data) + print(f"✓ Created era8-corrupt-block-summary.era ({len(corrupt_data)} bytes)") + else: + print("⚠ ERA 8 file not found, skipping", file=sys.stderr) + + # Test 3: Block root mismatch - corrupt a block + era2 = find_era_file("00002") + if era2: + data = read_era_file(era2) + # Find and corrupt a block (blocks come after state in ERA file) + # We'll corrupt somewhere in the middle where blocks likely are + corrupt_offset = len(data) // 3 # Rough guess at block location + corrupt_data = corrupt_bytes_at_offset(data, corrupt_offset) + output = CORRUPT_DIR / "era2-wrong-block-root.era" + with open(output, 'wb') as f: + f.write(corrupt_data) + print(f"✓ Created era2-wrong-block-root.era ({len(corrupt_data)} bytes)") + else: + print("⚠ ERA 2 file not found, skipping", file=sys.stderr) + + print(f"\n✓ Corrupt files created in: {CORRUPT_DIR}") + print(f"Total files: {len(list(CORRUPT_DIR.glob('*.era')))}") + +if __name__ == "__main__": + main() diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00000-effed385.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00000-effed385.era new file mode 100644 index 0000000000..f89fda3844 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00000-effed385.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00001-c33842ef.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00001-c33842ef.era new file mode 100644 index 0000000000..9ae316d812 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00001-c33842ef.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00002-8d6762e2.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00002-8d6762e2.era new file mode 100644 index 0000000000..187b85694f Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00002-8d6762e2.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00003-62feb608.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00003-62feb608.era new file mode 100644 index 0000000000..01f7749778 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00003-62feb608.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00004-3a2dbbc9.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00004-3a2dbbc9.era new file mode 100644 index 0000000000..19731cccb0 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00004-3a2dbbc9.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00005-8c0cd358.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00005-8c0cd358.era new file mode 100644 index 0000000000..472496b6e9 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00005-8c0cd358.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00006-f142ca09.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00006-f142ca09.era new file mode 100644 index 0000000000..98b2e940cf Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00006-f142ca09.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00007-dff7bc62.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00007-dff7bc62.era new file mode 100644 index 0000000000..6255ebc6cc Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00007-dff7bc62.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00008-b2b38a6c.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00008-b2b38a6c.era new file mode 100644 index 0000000000..c1d797b37b Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00008-b2b38a6c.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00009-2a918294.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00009-2a918294.era new file mode 100644 index 0000000000..39791cb392 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00009-2a918294.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00010-ce2964fb.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00010-ce2964fb.era new file mode 100644 index 0000000000..65184de626 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00010-ce2964fb.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00011-19a0f945.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00011-19a0f945.era new file mode 100644 index 0000000000..a15e012759 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00011-19a0f945.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00012-ea166d10.era b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00012-ea166d10.era new file mode 100644 index 0000000000..16682cf178 Binary files /dev/null and b/beacon_node/beacon_chain/tests/era_test_vectors/era/minimal-00012-ea166d10.era differ diff --git a/beacon_node/beacon_chain/tests/era_test_vectors/metadata.json b/beacon_node/beacon_chain/tests/era_test_vectors/metadata.json new file mode 100644 index 0000000000..c0d5895953 --- /dev/null +++ b/beacon_node/beacon_chain/tests/era_test_vectors/metadata.json @@ -0,0 +1 @@ +{"head_slot": 802, "head_root": "49f82639", "finalized_slot": 784, "finalized_root": "55720c58", "era_count": 13, "last_era_slot": 832} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index fe3477dbfe..9b08ca9bc3 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2076,6 +2076,17 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + /// Store a pre-finalization state in the freezer database, applying ops atomically. + pub fn put_cold_state( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error> { + let mut ops: Vec = Vec::new(); + self.store_cold_state(state_root, state, &mut ops)?; + self.cold_db.do_atomically(ops) + } + /// Store a pre-finalization state in the freezer database. pub fn store_cold_state( &self, diff --git a/consensus/types/src/state/historical_summary.rs b/consensus/types/src/state/historical_summary.rs index f520e46483..eca28b88fb 100644 --- a/consensus/types/src/state/historical_summary.rs +++ b/consensus/types/src/state/historical_summary.rs @@ -47,4 +47,8 @@ impl HistoricalSummary { state_summary_root: state.state_roots().tree_hash_root(), } } + + pub fn block_summary_root(&self) -> Hash256 { + self.block_summary_root + } } diff --git a/lcli/src/consume_era_files.rs b/lcli/src/consume_era_files.rs new file mode 100644 index 0000000000..a7e79b1122 --- /dev/null +++ b/lcli/src/consume_era_files.rs @@ -0,0 +1,128 @@ +use beacon_chain::era::consumer::EraFileDir; +use clap::ArgMatches; +use clap_utils::parse_required; +use environment::Environment; +use eth2_network_config::Eth2NetworkConfig; +use std::path::PathBuf; +use std::time::Duration; +use store::database::interface::BeaconNodeBackend; +use store::{HotColdDB, StoreConfig}; +use tracing::info; +use types::EthSpec; + +fn is_dir_non_empty(path: &PathBuf) -> bool { + path.exists() + && std::fs::read_dir(path) + .map(|mut entries| entries.next().is_some()) + .unwrap_or(false) +} + +pub fn run( + env: Environment, + network_config: Eth2NetworkConfig, + matches: &ArgMatches, +) -> Result<(), String> { + let datadir: PathBuf = parse_required(matches, "datadir")?; + let era_dir: PathBuf = parse_required(matches, "era-dir")?; + + let hot_path = datadir.join("chain_db"); + let cold_path = datadir.join("freezer_db"); + let blobs_path = datadir.join("blobs_db"); + + // Fail fast if database directories already contain data + if is_dir_non_empty(&hot_path) || is_dir_non_empty(&cold_path) { + return Err(format!( + "Database directories are not empty: {} / {}. \ + This command expects a fresh datadir.", + hot_path.display(), + cold_path.display(), + )); + } + + let spec = env.eth2_config.spec.clone(); + + info!( + hot_path = %hot_path.display(), + cold_path = %cold_path.display(), + era_dir = %era_dir.display(), + "Opening database" + ); + + std::fs::create_dir_all(&hot_path).map_err(|e| format!("Failed to create hot db dir: {e}"))?; + std::fs::create_dir_all(&cold_path) + .map_err(|e| format!("Failed to create cold db dir: {e}"))?; + std::fs::create_dir_all(&blobs_path) + .map_err(|e| format!("Failed to create blobs db dir: {e}"))?; + + let db = HotColdDB::, BeaconNodeBackend>::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + StoreConfig::default(), + spec.clone(), + ) + .map_err(|e| format!("Failed to open database: {e:?}"))?; + + // Load genesis state from the network config + let mut genesis_state = env + .runtime() + .block_on(network_config.genesis_state::(None, Duration::from_secs(120))) + .map_err(|e| format!("Failed to load genesis state: {e}"))? + .ok_or("No genesis state available for this network")?; + + // Open ERA files directory and validate against genesis + let era_file_dir = EraFileDir::new::(&era_dir, &spec) + .map_err(|e| format!("Failed to open ERA dir: {e}"))?; + + // Verify ERA files match the network's genesis + if era_file_dir.genesis_validators_root() != genesis_state.genesis_validators_root() { + return Err(format!( + "ERA files genesis_validators_root ({:?}) does not match network genesis ({:?}). \ + Are the ERA files from the correct network?", + era_file_dir.genesis_validators_root(), + genesis_state.genesis_validators_root(), + )); + } + + info!( + genesis_validators_root = %genesis_state.genesis_validators_root(), + "Storing genesis state" + ); + + let genesis_root = genesis_state + .canonical_root() + .map_err(|e| format!("Failed to hash genesis state: {e:?}"))?; + db.put_cold_state(&genesis_root, &genesis_state) + .map_err(|e| format!("Failed to store genesis state: {e:?}"))?; + + let max_era = era_file_dir.max_era(); + info!(max_era, "Importing ERA files"); + + let start = std::time::Instant::now(); + for era_number in 1..=max_era { + era_file_dir + .import_era_file(&db, era_number, &spec, None) + .map_err(|e| format!("Failed to import ERA {era_number}: {e}"))?; + + if era_number % 100 == 0 || era_number == max_era { + let elapsed = start.elapsed(); + let rate = era_number as f64 / elapsed.as_secs_f64(); + info!( + era_number, + max_era, + ?elapsed, + rate = format!("{rate:.1} era/s"), + "Progress" + ); + } + } + + info!( + max_era, + elapsed = ?start.elapsed(), + "ERA file import complete. Database is ready." + ); + + Ok(()) +} diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 63dd0f2c5b..82266e0639 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -1,11 +1,13 @@ mod block_root; mod check_deposit_data; +mod consume_era_files; mod generate_bootnode_enr; mod http_sync; mod indexed_attestations; mod mnemonic_validators; mod mock_el; mod parse_ssz; +mod produce_era_files; mod skip_slots; mod state_root; mod transition_blocks; @@ -571,6 +573,50 @@ fn main() { .display_order(0) ) ) + .subcommand( + Command::new("consume-era-files") + .about("Import ERA files into an empty database, producing a ready-to-use beacon node DB.") + .arg( + Arg::new("datadir") + .long("datadir") + .value_name("PATH") + .action(ArgAction::Set) + .required(true) + .help("Path to the beacon node data directory (will create chain_db, freezer_db, blobs_db inside).") + .display_order(0) + ) + .arg( + Arg::new("era-dir") + .long("era-dir") + .value_name("PATH") + .action(ArgAction::Set) + .required(true) + .help("Directory containing ERA files to import.") + .display_order(0) + ) + ) + .subcommand( + Command::new("produce-era-files") + .about("Produce ERA files from a fully reconstructed beacon node database.") + .arg( + Arg::new("datadir") + .long("datadir") + .value_name("PATH") + .action(ArgAction::Set) + .required(true) + .help("Path to the beacon node data directory (containing chain_db, freezer_db, blobs_db).") + .display_order(0) + ) + .arg( + Arg::new("output-dir") + .long("output-dir") + .value_name("PATH") + .action(ArgAction::Set) + .required(true) + .help("Directory to write ERA files to. Created if it does not exist.") + .display_order(0) + ) + ) .subcommand( Command::new("http-sync") .about("Manual sync") @@ -765,6 +811,13 @@ fn run(env_builder: EnvironmentBuilder, matches: &ArgMatches) -> } Some(("mock-el", matches)) => mock_el::run::(env, matches) .map_err(|e| format!("Failed to run mock-el command: {}", e)), + Some(("consume-era-files", matches)) => { + let network_config = get_network_config()?; + consume_era_files::run::(env, network_config, matches) + .map_err(|e| format!("Failed to consume ERA files: {}", e)) + } + Some(("produce-era-files", matches)) => produce_era_files::run::(env, matches) + .map_err(|e| format!("Failed to produce ERA files: {}", e)), Some(("http-sync", matches)) => { let network_config = get_network_config()?; http_sync::run::(env, network_config, matches) diff --git a/lcli/src/produce_era_files.rs b/lcli/src/produce_era_files.rs new file mode 100644 index 0000000000..e8fdd8fa66 --- /dev/null +++ b/lcli/src/produce_era_files.rs @@ -0,0 +1,90 @@ +use beacon_chain::era::producer; +use clap::ArgMatches; +use clap_utils::parse_required; +use environment::Environment; +use std::path::PathBuf; +use store::database::interface::BeaconNodeBackend; +use store::{HotColdDB, StoreConfig}; +use tracing::info; +use types::EthSpec; + +pub fn run(env: Environment, matches: &ArgMatches) -> Result<(), String> { + let datadir: PathBuf = parse_required(matches, "datadir")?; + let output_dir: PathBuf = parse_required(matches, "output-dir")?; + + let hot_path = datadir.join("chain_db"); + let cold_path = datadir.join("freezer_db"); + let blobs_path = datadir.join("blobs_db"); + + let spec = env.eth2_config.spec.clone(); + + info!( + hot_path = %hot_path.display(), + cold_path = %cold_path.display(), + output_dir = %output_dir.display(), + "Opening database" + ); + + let db = HotColdDB::, BeaconNodeBackend>::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + StoreConfig::default(), + spec, + ) + .map_err(|e| format!("Failed to open database: {e:?}"))?; + + let anchor = db.get_anchor_info(); + let split = db.get_split_info(); + + info!( + anchor_slot = %anchor.anchor_slot, + state_lower_limit = %anchor.state_lower_limit, + state_upper_limit = %anchor.state_upper_limit, + oldest_block_slot = %anchor.oldest_block_slot, + split_slot = %split.slot, + "Database info" + ); + + // Verify reconstruction is complete: state_lower_limit should equal state_upper_limit + if !anchor.all_historic_states_stored() { + return Err(format!( + "State reconstruction is not complete. \ + state_lower_limit={}, state_upper_limit={}. \ + Run with --reconstruct-historic-states first.", + anchor.state_lower_limit, anchor.state_upper_limit, + )); + } + + // Verify block backfill is complete + if anchor.oldest_block_slot > 0 { + return Err(format!( + "Block backfill is not complete. oldest_block_slot={}. \ + Complete backfill sync first.", + anchor.oldest_block_slot, + )); + } + + let slots_per_historical_root = E::slots_per_historical_root() as u64; + // An ERA can only be created if its end slot <= split slot (finalized boundary) + let max_era = split.slot.as_u64() / slots_per_historical_root; + + info!(max_era, "Producing ERA files from 0 to max_era"); + + std::fs::create_dir_all(&output_dir) + .map_err(|e| format!("Failed to create output directory: {e}"))?; + + for era_number in 0..=max_era { + producer::create_era_file(&db, era_number, &output_dir) + .map_err(|e| format!("Failed to produce ERA file {era_number}: {e}"))?; + + if (era_number + 1) % 100 == 0 || era_number == max_era { + info!(era_number, max_era, "Progress"); + } + } + + info!(max_era, "ERA file production complete"); + + Ok(()) +}