mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-17 21:08:32 +00:00
Compare commits
3 Commits
epbs-devne
...
era-lcli
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
676ad5dbba | ||
|
|
f368a9a31e | ||
|
|
6cc3d63c8b |
161
Cargo.lock
generated
161
Cargo.lock
generated
@@ -1232,8 +1232,8 @@ dependencies = [
|
||||
"eth2_network_config",
|
||||
"ethereum_hashing",
|
||||
"ethereum_serde_utils",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"execution_layer",
|
||||
"fixed_bytes",
|
||||
"fork_choice",
|
||||
@@ -1259,10 +1259,12 @@ dependencies = [
|
||||
"proto_array",
|
||||
"rand 0.9.2",
|
||||
"rayon",
|
||||
"reth-era",
|
||||
"safe_arith",
|
||||
"sensitive_url",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_yaml",
|
||||
"slasher",
|
||||
"slot_clock",
|
||||
"smallvec",
|
||||
@@ -1508,7 +1510,7 @@ dependencies = [
|
||||
"blst",
|
||||
"ethereum_hashing",
|
||||
"ethereum_serde_utils",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"fixed_bytes",
|
||||
"hex",
|
||||
"rand 0.9.2",
|
||||
@@ -1555,7 +1557,7 @@ dependencies = [
|
||||
"clap",
|
||||
"clap_utils",
|
||||
"eth2_network_config",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"hex",
|
||||
"lighthouse_network",
|
||||
"log",
|
||||
@@ -1613,7 +1615,7 @@ dependencies = [
|
||||
"bls",
|
||||
"context_deserialize",
|
||||
"eth2",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"lighthouse_version",
|
||||
"mockito",
|
||||
"reqwest",
|
||||
@@ -1881,7 +1883,7 @@ dependencies = [
|
||||
"clap",
|
||||
"dirs",
|
||||
"eth2_network_config",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"hex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -1900,7 +1902,7 @@ dependencies = [
|
||||
"environment",
|
||||
"eth2",
|
||||
"eth2_config",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"execution_layer",
|
||||
"futures",
|
||||
"genesis",
|
||||
@@ -2334,6 +2336,16 @@ dependencies = [
|
||||
"syn 2.0.111",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling"
|
||||
version = "0.20.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
|
||||
dependencies = [
|
||||
"darling_core 0.20.11",
|
||||
"darling_macro 0.20.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling"
|
||||
version = "0.21.3"
|
||||
@@ -2354,6 +2366,20 @@ dependencies = [
|
||||
"darling_macro 0.23.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling_core"
|
||||
version = "0.20.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e"
|
||||
dependencies = [
|
||||
"fnv",
|
||||
"ident_case",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"strsim",
|
||||
"syn 2.0.111",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling_core"
|
||||
version = "0.21.3"
|
||||
@@ -2382,6 +2408,17 @@ dependencies = [
|
||||
"syn 2.0.111",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling_macro"
|
||||
version = "0.20.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
|
||||
dependencies = [
|
||||
"darling_core 0.20.11",
|
||||
"quote",
|
||||
"syn 2.0.111",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling_macro"
|
||||
version = "0.21.3"
|
||||
@@ -2506,7 +2543,7 @@ dependencies = [
|
||||
"alloy-json-abi",
|
||||
"alloy-primitives",
|
||||
"bls",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"hex",
|
||||
"reqwest",
|
||||
"serde_json",
|
||||
@@ -2842,8 +2879,8 @@ dependencies = [
|
||||
"context_deserialize",
|
||||
"educe",
|
||||
"eth2_network_config",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"execution_layer",
|
||||
"fork_choice",
|
||||
"fs2",
|
||||
@@ -3123,8 +3160,8 @@ dependencies = [
|
||||
"enr",
|
||||
"eth2_keystore",
|
||||
"ethereum_serde_utils",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"libp2p-identity",
|
||||
@@ -3210,7 +3247,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"discv5",
|
||||
"eth2_config",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"fixed_bytes",
|
||||
"kzg",
|
||||
"pretty_reqwest_error",
|
||||
@@ -3275,6 +3312,21 @@ dependencies = [
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ethereum_ssz"
|
||||
version = "0.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0dcddb2554d19cde19b099fadddde576929d7a4d0c1cd3512d1fd95cf174375c"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"ethereum_serde_utils",
|
||||
"itertools 0.13.0",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"smallvec",
|
||||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ethereum_ssz"
|
||||
version = "0.10.1"
|
||||
@@ -3292,6 +3344,18 @@ dependencies = [
|
||||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ethereum_ssz_derive"
|
||||
version = "0.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a657b6b3b7e153637dc6bdc6566ad9279d9ee11a15b12cfb24a2e04360637e9f"
|
||||
dependencies = [
|
||||
"darling 0.20.11",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.111",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ethereum_ssz_derive"
|
||||
version = "0.10.1"
|
||||
@@ -3385,7 +3449,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"eth2",
|
||||
"ethereum_serde_utils",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"fixed_bytes",
|
||||
"fork_choice",
|
||||
"hash-db",
|
||||
@@ -3583,8 +3647,8 @@ name = "fork_choice"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"beacon_chain",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"fixed_bytes",
|
||||
"logging",
|
||||
"metrics",
|
||||
@@ -3778,7 +3842,7 @@ version = "0.2.0"
|
||||
dependencies = [
|
||||
"bls",
|
||||
"ethereum_hashing",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"int_to_bytes",
|
||||
"merkle_proof",
|
||||
"rayon",
|
||||
@@ -4198,7 +4262,7 @@ dependencies = [
|
||||
"either",
|
||||
"eth2",
|
||||
"ethereum_serde_utils",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"execution_layer",
|
||||
"fixed_bytes",
|
||||
"futures",
|
||||
@@ -4839,8 +4903,8 @@ dependencies = [
|
||||
"educe",
|
||||
"ethereum_hashing",
|
||||
"ethereum_serde_utils",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"hex",
|
||||
"rayon",
|
||||
"rust_eth_kzg",
|
||||
@@ -4880,7 +4944,7 @@ dependencies = [
|
||||
"eth2_network_config",
|
||||
"eth2_wallet",
|
||||
"ethereum_hashing",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"execution_layer",
|
||||
"fixed_bytes",
|
||||
"hex",
|
||||
@@ -5417,8 +5481,8 @@ dependencies = [
|
||||
"discv5",
|
||||
"either",
|
||||
"eth2",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"fixed_bytes",
|
||||
"fnv",
|
||||
"futures",
|
||||
@@ -5761,8 +5825,8 @@ dependencies = [
|
||||
"context_deserialize",
|
||||
"educe",
|
||||
"ethereum_hashing",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"itertools 0.13.0",
|
||||
"parking_lot",
|
||||
"rayon",
|
||||
@@ -6062,7 +6126,7 @@ dependencies = [
|
||||
"educe",
|
||||
"eth2",
|
||||
"eth2_network_config",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"execution_layer",
|
||||
"fixed_bytes",
|
||||
"fnv",
|
||||
@@ -6457,8 +6521,8 @@ dependencies = [
|
||||
"bitvec",
|
||||
"bls",
|
||||
"educe",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"fixed_bytes",
|
||||
"itertools 0.14.0",
|
||||
"maplit",
|
||||
@@ -7023,8 +7087,8 @@ dependencies = [
|
||||
name = "proto_array"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"fixed_bytes",
|
||||
"safe_arith",
|
||||
"serde",
|
||||
@@ -7450,6 +7514,21 @@ version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7"
|
||||
|
||||
[[package]]
|
||||
name = "reth-era"
|
||||
version = "1.9.3"
|
||||
source = "git+https://github.com/paradigmxyz/reth.git?rev=62abfdaeb54e8a205a8ee085ddebd56047d93374#62abfdaeb54e8a205a8ee085ddebd56047d93374"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-eips",
|
||||
"alloy-primitives",
|
||||
"alloy-rlp",
|
||||
"ethereum_ssz 0.9.1",
|
||||
"ethereum_ssz_derive 0.9.1",
|
||||
"snap",
|
||||
"thiserror 2.0.17",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rfc6979"
|
||||
version = "0.4.0"
|
||||
@@ -8242,8 +8321,8 @@ dependencies = [
|
||||
"bls",
|
||||
"byteorder",
|
||||
"educe",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"filesystem",
|
||||
"fixed_bytes",
|
||||
"flate2",
|
||||
@@ -8407,7 +8486,7 @@ dependencies = [
|
||||
"context_deserialize",
|
||||
"educe",
|
||||
"ethereum_serde_utils",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"itertools 0.14.0",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
@@ -8431,8 +8510,8 @@ dependencies = [
|
||||
"bls",
|
||||
"educe",
|
||||
"ethereum_hashing",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"fixed_bytes",
|
||||
"int_to_bytes",
|
||||
"integer-sqrt",
|
||||
@@ -8459,7 +8538,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"beacon_chain",
|
||||
"bls",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"fixed_bytes",
|
||||
"state_processing",
|
||||
"tokio",
|
||||
@@ -8481,8 +8560,8 @@ dependencies = [
|
||||
"criterion",
|
||||
"db-key",
|
||||
"directory",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"fixed_bytes",
|
||||
"itertools 0.14.0",
|
||||
"leveldb",
|
||||
@@ -9285,7 +9364,7 @@ checksum = "f7fd51aa83d2eb83b04570808430808b5d24fdbf479a4d5ac5dee4a2e2dd2be4"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"ethereum_hashing",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"smallvec",
|
||||
"typenum",
|
||||
]
|
||||
@@ -9350,8 +9429,8 @@ dependencies = [
|
||||
"eth2_interop_keypairs",
|
||||
"ethereum_hashing",
|
||||
"ethereum_serde_utils",
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"ethereum_ssz 0.10.1",
|
||||
"ethereum_ssz_derive 0.10.1",
|
||||
"fixed_bytes",
|
||||
"hex",
|
||||
"int_to_bytes",
|
||||
|
||||
@@ -207,6 +207,7 @@ rand = "0.9.0"
|
||||
rayon = "1.7"
|
||||
regex = "1"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "stream", "rustls-tls"] }
|
||||
reth-era = { git = "https://github.com/paradigmxyz/reth.git", rev = "62abfdaeb54e8a205a8ee085ddebd56047d93374" }
|
||||
ring = "0.17"
|
||||
rpds = "0.11"
|
||||
rusqlite = { version = "0.38", features = ["bundled"] }
|
||||
|
||||
@@ -50,6 +50,7 @@ parking_lot = { workspace = true }
|
||||
proto_array = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rayon = { workspace = true }
|
||||
reth-era = { workspace = true }
|
||||
safe_arith = { workspace = true }
|
||||
sensitive_url = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
@@ -79,6 +80,7 @@ maplit = { workspace = true }
|
||||
mockall = { workspace = true }
|
||||
mockall_double = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
serde_yaml = { workspace = true }
|
||||
|
||||
[[bench]]
|
||||
name = "benches"
|
||||
|
||||
507
beacon_node/beacon_chain/src/era/consumer.rs
Normal file
507
beacon_node/beacon_chain/src/era/consumer.rs
Normal file
@@ -0,0 +1,507 @@
|
||||
//! Import ERA files into a Lighthouse database.
|
||||
//!
|
||||
//! `EraFileDir` reads a directory of ERA files and imports them sequentially into the cold DB.
|
||||
//! Each ERA file is verified against `historical_roots` / `historical_summaries` from the
|
||||
//! highest-numbered ERA state, ensuring block and state integrity without trusting the files.
|
||||
//!
|
||||
//! Block roots are cross-checked against the ERA boundary state's `block_roots` vector.
|
||||
//! Block signatures are NOT verified — ERA files are trusted at that level.
|
||||
|
||||
use bls::FixedBytesExtended;
|
||||
use rayon::prelude::*;
|
||||
use reth_era::common::file_ops::StreamReader;
|
||||
use reth_era::era::file::EraReader;
|
||||
use reth_era::era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock};
|
||||
use std::fs::{self, File};
|
||||
use std::path::{Path, PathBuf};
|
||||
use store::{DBColumn, HotColdDB, ItemStore, KeyValueStoreOp};
|
||||
use tracing::{debug, debug_span, info, instrument, warn};
|
||||
use tree_hash::TreeHash;
|
||||
use types::{
|
||||
BeaconState, ChainSpec, EthSpec, Hash256, HistoricalBatch, HistoricalSummary,
|
||||
SignedBeaconBlock, Slot,
|
||||
};
|
||||
|
||||
fn decode_block<E: EthSpec>(
|
||||
compressed: CompressedSignedBeaconBlock,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<SignedBeaconBlock<E>, String> {
|
||||
let bytes = compressed
|
||||
.decompress()
|
||||
.map_err(|error| format!("failed to decompress block: {error:?}"))?;
|
||||
SignedBeaconBlock::from_ssz_bytes(&bytes, spec)
|
||||
.map_err(|error| format!("failed to decode block: {error:?}"))
|
||||
}
|
||||
|
||||
fn decode_state<E: EthSpec>(
|
||||
compressed: CompressedBeaconState,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<BeaconState<E>, String> {
|
||||
let bytes = compressed
|
||||
.decompress()
|
||||
.map_err(|error| format!("failed to decompress state: {error:?}"))?;
|
||||
BeaconState::from_ssz_bytes(&bytes, spec)
|
||||
.map_err(|error| format!("failed to decode state: {error:?}"))
|
||||
}
|
||||
|
||||
pub struct EraFileDir {
|
||||
dir: PathBuf,
|
||||
network_name: String,
|
||||
genesis_validators_root: Hash256,
|
||||
historical_roots: Vec<Hash256>,
|
||||
historical_summaries: Vec<HistoricalSummary>,
|
||||
max_era: u64,
|
||||
}
|
||||
|
||||
impl EraFileDir {
|
||||
pub fn new<E: EthSpec>(era_files_dir: &Path, spec: &ChainSpec) -> Result<Self, String> {
|
||||
let mut era_files = list_era_files(era_files_dir)?;
|
||||
era_files.sort_by_key(|(era_number, _)| *era_number);
|
||||
|
||||
let network_name = spec
|
||||
.config_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
|
||||
let Some((max_era, reference_path)) = era_files.last().cloned() else {
|
||||
return Err("era files directory is empty".to_string());
|
||||
};
|
||||
|
||||
let reference_state = read_era_state::<E>(&reference_path, &network_name, spec)?;
|
||||
|
||||
// historical_roots was frozen in capella, and continued as historical_summaries
|
||||
let historical_roots = reference_state.historical_roots().to_vec();
|
||||
// Pre-Capella states don't have historical_summaries property
|
||||
let historical_summaries = match reference_state.historical_summaries() {
|
||||
Ok(list) => list.to_vec(),
|
||||
Err(_) => vec![],
|
||||
};
|
||||
|
||||
let dir = era_files_dir.to_path_buf();
|
||||
let era_dir = Self {
|
||||
dir,
|
||||
network_name,
|
||||
genesis_validators_root: reference_state.genesis_validators_root(),
|
||||
historical_roots,
|
||||
historical_summaries,
|
||||
max_era,
|
||||
};
|
||||
|
||||
// Verify that every expected era file name exists in the directory.
|
||||
for era_number in 0..=era_dir.max_era {
|
||||
let expected = era_dir.expected_path(era_number);
|
||||
if !expected.exists() {
|
||||
return Err(format!("missing era file: {expected:?}"));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(era_dir)
|
||||
}
|
||||
|
||||
pub fn max_era(&self) -> u64 {
|
||||
self.max_era
|
||||
}
|
||||
|
||||
pub fn genesis_validators_root(&self) -> Hash256 {
|
||||
self.genesis_validators_root
|
||||
}
|
||||
|
||||
/// Import all ERA files into a fresh store, verifying genesis and importing ERAs 1..=max_era.
|
||||
pub fn import_all<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
&self,
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
genesis_state: &mut BeaconState<E>,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<(), String> {
|
||||
if self.genesis_validators_root != genesis_state.genesis_validators_root() {
|
||||
return Err(format!(
|
||||
"ERA files genesis_validators_root ({:?}) does not match network genesis ({:?}). \
|
||||
Are the ERA files from the correct network?",
|
||||
self.genesis_validators_root,
|
||||
genesis_state.genesis_validators_root(),
|
||||
));
|
||||
}
|
||||
|
||||
let genesis_root = genesis_state
|
||||
.canonical_root()
|
||||
.map_err(|e| format!("Failed to hash genesis state: {e:?}"))?;
|
||||
store
|
||||
.put_cold_state(&genesis_root, genesis_state)
|
||||
.map_err(|e| format!("Failed to store genesis state: {e:?}"))?;
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
for era_number in 1..=self.max_era {
|
||||
self.import_era_file(store, era_number, spec, None)?;
|
||||
|
||||
if era_number % 100 == 0 || era_number == self.max_era {
|
||||
let elapsed = start.elapsed();
|
||||
let rate = era_number as f64 / elapsed.as_secs_f64();
|
||||
info!(
|
||||
era_number,
|
||||
max_era = self.max_era,
|
||||
?elapsed,
|
||||
rate = format!("{rate:.1} era/s"),
|
||||
"Progress"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
max_era = self.max_era,
|
||||
elapsed = ?start.elapsed(),
|
||||
"ERA file import complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, fields(era_number = %era_number))]
|
||||
pub fn import_era_file<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
&self,
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
era_number: u64,
|
||||
spec: &ChainSpec,
|
||||
trusted_state: Option<(Hash256, Slot)>,
|
||||
) -> Result<(), String> {
|
||||
let path = self.expected_path(era_number);
|
||||
debug!(?path, era_number, "Importing era file");
|
||||
let file = File::open(path).map_err(|error| format!("failed to open era file: {error}"))?;
|
||||
let era_file = {
|
||||
let _span = debug_span!("era_import_read").entered();
|
||||
EraReader::new(file)
|
||||
.read_and_assemble(self.network_name.clone())
|
||||
.map_err(|error| format!("failed to parse era file: {error:?}"))?
|
||||
};
|
||||
|
||||
// Consistency checks: ensure the era state matches the expected historical root and that
|
||||
// each block root matches the state block_roots for its slot.
|
||||
let mut state = {
|
||||
let _span = debug_span!("era_import_decode_state").entered();
|
||||
decode_state::<E>(era_file.group.era_state, spec)?
|
||||
};
|
||||
|
||||
// Verify trusted state root if provided
|
||||
if let Some((expected_root, expected_slot)) = trusted_state {
|
||||
if state.slot() != expected_slot {
|
||||
return Err(format!(
|
||||
"trusted slot mismatch: expected {expected_slot}, got {}",
|
||||
state.slot()
|
||||
));
|
||||
}
|
||||
let actual_root = state
|
||||
.canonical_root()
|
||||
.map_err(|e| format!("Failed to compute state root: {e:?}"))?;
|
||||
if actual_root != expected_root {
|
||||
return Err(format!(
|
||||
"trusted state root mismatch at slot {expected_slot}: \
|
||||
expected {expected_root:?}, got {actual_root:?}"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let expected_root = self
|
||||
.era_file_name_root(era_number)
|
||||
.ok_or_else(|| format!("missing historical root for era {era_number}"))?;
|
||||
let actual_root = era_root_from_state(&state, era_number)?;
|
||||
if expected_root != actual_root {
|
||||
return Err(format!(
|
||||
"era root mismatch for era {era_number}: expected {expected_root:?}, got {actual_root:?}"
|
||||
));
|
||||
}
|
||||
|
||||
let slots_per_historical_root = E::slots_per_historical_root() as u64;
|
||||
let _start_slot = Slot::new(era_number.saturating_sub(1) * slots_per_historical_root);
|
||||
let end_slot = Slot::new(era_number * slots_per_historical_root);
|
||||
if state.slot() != end_slot {
|
||||
return Err(format!(
|
||||
"era state slot mismatch: expected {end_slot}, got {}",
|
||||
state.slot()
|
||||
));
|
||||
}
|
||||
|
||||
let historical_summaries_active_at_slot = match store.spec.capella_fork_epoch {
|
||||
// For mainnet case, Capella activates at 194048 epoch = 6209536 slot = 758 era number.
|
||||
// The last epoch processing before capella transition adds a last entry to historical
|
||||
// roots. So historical_roots.len() == 758 at the capella fork boundary. An ERA file
|
||||
// includes the state AFTER advanced (or applying) the block at the final slot, so the
|
||||
// state for ERA file 758 is of the Capella variant. However, historical summaries are
|
||||
// still empty.
|
||||
Some(epoch) => {
|
||||
epoch.start_slot(E::slots_per_epoch())
|
||||
+ Slot::new(E::slots_per_historical_root() as u64)
|
||||
}
|
||||
None => Slot::max_value(),
|
||||
};
|
||||
|
||||
// Check that the block roots vector in this state match the historical summary in the last
|
||||
// state. Asserts that the blocks are exactly the expected ones given a trusted final state
|
||||
if era_number == 0 {
|
||||
// Skip checking genesis state era file for now
|
||||
} else if state.slot() >= historical_summaries_active_at_slot {
|
||||
// Post-capella state, check against historical summaries
|
||||
// ```py
|
||||
// historical_summary = HistoricalSummary(
|
||||
// block_summary_root=hash_tree_root(state.block_roots),
|
||||
// state_summary_root=hash_tree_root(state.state_roots),
|
||||
// )
|
||||
// state.historical_summaries.append(historical_summary)
|
||||
// ```
|
||||
let index = era_number.saturating_sub(1) as usize;
|
||||
// historical_summaries started to be appended after capella, so we need to offset
|
||||
let summary_index = index
|
||||
.checked_sub(self.historical_roots.len())
|
||||
.ok_or_else(|| format!(
|
||||
"Not enough historical roots era number {era_number} index {index} historical_roots len {}",
|
||||
self.historical_roots.len()
|
||||
))?;
|
||||
let expected_root = self
|
||||
.historical_summaries
|
||||
.get(summary_index)
|
||||
.ok_or_else(|| format!("missing historical summary for era {era_number}"))?
|
||||
.block_summary_root();
|
||||
let actual_root = state.block_roots().tree_hash_root();
|
||||
if actual_root != expected_root {
|
||||
return Err(format!(
|
||||
"block summary root post-capella mismatch for era {}: {:?} != {:?}",
|
||||
era_number, expected_root, actual_root
|
||||
));
|
||||
}
|
||||
} else {
|
||||
// Pre-capella state, check against historical roots
|
||||
// ```py
|
||||
// historical_batch = HistoricalBatch(block_roots=state.block_roots, state_roots=state.state_roots)
|
||||
// state.historical_roots.append(hash_tree_root(historical_batch))
|
||||
// ```
|
||||
let index = era_number.saturating_sub(1) as usize;
|
||||
let expected_root = *self
|
||||
.historical_roots
|
||||
.get(index)
|
||||
.ok_or_else(|| format!("missing historical root for era {era_number}"))?;
|
||||
let historical_batch = HistoricalBatch::<E> {
|
||||
block_roots: state.block_roots().clone(),
|
||||
state_roots: state.state_roots().clone(),
|
||||
};
|
||||
let actual_root = historical_batch.tree_hash_root();
|
||||
if actual_root != expected_root {
|
||||
return Err(format!(
|
||||
"block summary root pre-capella mismatch for era {}: {:?} != {:?}",
|
||||
era_number, expected_root, actual_root
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
debug!(era_number, "Importing blocks from era file");
|
||||
// TODO(era): Block signatures are not verified here and are trusted.
|
||||
// decode and hash is split in two loops to track timings better. If we add spans for each
|
||||
// block it's too short and the data is not really useful.
|
||||
let decoded_blocks = {
|
||||
let _span = debug_span!("era_import_decode_blocks").entered();
|
||||
era_file
|
||||
.group
|
||||
.blocks
|
||||
.into_par_iter()
|
||||
.map(|compressed_block| decode_block::<E>(compressed_block, spec))
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
};
|
||||
let blocks_with_roots = {
|
||||
let _span = debug_span!("era_import_hash_blocks").entered();
|
||||
decoded_blocks
|
||||
.into_par_iter()
|
||||
.map(|block| (block.canonical_root(), block))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
let mut block_ops = vec![];
|
||||
{
|
||||
let _ = debug_span!("era_import_db_ops_blocks").entered();
|
||||
for (block_root, block) in blocks_with_roots {
|
||||
let slot = block.slot();
|
||||
// Check consistency that this block is expected w.r.t. the state in the era file.
|
||||
// Since we check that the state block roots match the historical summary, we know that
|
||||
// this block root is the expected one.
|
||||
let expected_block_root = state
|
||||
.get_block_root(slot)
|
||||
.map_err(|error| format!("failed to read block root {slot}: {error:?}"))?;
|
||||
if *expected_block_root != block_root {
|
||||
return Err(format!(
|
||||
"block root mismatch at slot {slot}: expected {expected_block_root:?}, got {block_root:?}"
|
||||
));
|
||||
}
|
||||
store
|
||||
.block_as_kv_store_ops(&block_root, block, &mut block_ops)
|
||||
.map_err(|error| format!("failed to store block: {error:?}"))?;
|
||||
}
|
||||
}
|
||||
{
|
||||
let _ = debug_span!("era_import_write_blocks").entered();
|
||||
store
|
||||
.hot_db
|
||||
.do_atomically(block_ops)
|
||||
.map_err(|error| format!("failed to store blocks: {error:?}"))?;
|
||||
}
|
||||
|
||||
// Populate the cold DB slot -> block root index from the state.block_roots()
|
||||
{
|
||||
let _span = debug_span!("era_import_write_block_index").entered();
|
||||
write_block_root_index_for_era(store, &state, era_number)?;
|
||||
}
|
||||
|
||||
debug!(era_number, "Importing state from era file");
|
||||
{
|
||||
let _span = debug_span!("era_import_write_state").entered();
|
||||
let state_root = state
|
||||
.canonical_root()
|
||||
.map_err(|error| format!("failed to hash state: {error:?}"))?;
|
||||
// Use put_cold_state as the split is not updated and we need the state into the cold store.
|
||||
store
|
||||
.put_cold_state(&state_root, &state)
|
||||
.map_err(|error| format!("failed to store state: {error:?}"))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn expected_path(&self, era_number: u64) -> PathBuf {
|
||||
let root = self
|
||||
.era_file_name_root(era_number)
|
||||
.unwrap_or_else(Hash256::zero);
|
||||
let short = root
|
||||
.as_slice()
|
||||
.iter()
|
||||
.take(4)
|
||||
.map(|byte| format!("{byte:02x}"))
|
||||
.collect::<String>();
|
||||
let filename = format!("{}-{era_number:05}-{short}.era", self.network_name);
|
||||
self.dir.join(filename)
|
||||
}
|
||||
|
||||
// era_file_name_root for file naming:
|
||||
// short-era-root is the first 4 bytes of the last historical root in the last state in the
|
||||
// era file, lower-case hex-encoded (8 characters), except the genesis era which instead
|
||||
// uses the genesis_validators_root field from the genesis state.
|
||||
// - The root is available as state.historical_roots[era - 1] except for genesis, which is
|
||||
// state.genesis_validators_root
|
||||
// - Post-Capella, the root must be computed from
|
||||
// `state.historical_summaries[era - state.historical_roots.len - 1]`
|
||||
fn era_file_name_root(&self, era_number: u64) -> Option<Hash256> {
|
||||
if era_number == 0 {
|
||||
return Some(self.genesis_validators_root);
|
||||
}
|
||||
let index = era_number.saturating_sub(1) as usize;
|
||||
if let Some(root) = self.historical_roots.get(index) {
|
||||
return Some(*root);
|
||||
}
|
||||
let summary_index = index.saturating_sub(self.historical_roots.len());
|
||||
self.historical_summaries
|
||||
.get(summary_index)
|
||||
.map(|summary| summary.tree_hash_root())
|
||||
}
|
||||
}
|
||||
|
||||
fn read_era_state<E: EthSpec>(
|
||||
path: &Path,
|
||||
network_name: &str,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<BeaconState<E>, String> {
|
||||
let file = File::open(path).map_err(|error| format!("failed to open era file: {error}"))?;
|
||||
let era_file = EraReader::new(file)
|
||||
.read_and_assemble(network_name.to_string())
|
||||
.map_err(|error| format!("failed to parse era file: {error:?}"))?;
|
||||
decode_state::<E>(era_file.group.era_state, spec)
|
||||
}
|
||||
|
||||
fn era_root_from_state<E: EthSpec>(
|
||||
state: &BeaconState<E>,
|
||||
era_number: u64,
|
||||
) -> Result<Hash256, String> {
|
||||
if era_number == 0 {
|
||||
return Ok(state.genesis_validators_root());
|
||||
}
|
||||
let index = era_number
|
||||
.checked_sub(1)
|
||||
.ok_or_else(|| "invalid era number".to_string())? as usize;
|
||||
if let Some(root) = state.historical_roots().get(index) {
|
||||
return Ok(*root);
|
||||
}
|
||||
if let Ok(summaries) = state.historical_summaries() {
|
||||
let summary_index = index.saturating_sub(state.historical_roots().len());
|
||||
let summary = summaries
|
||||
.get(summary_index)
|
||||
.ok_or_else(|| "missing historical summary".to_string())?;
|
||||
return Ok(summary.tree_hash_root());
|
||||
}
|
||||
Err(format!("missing historical root for era {era_number}"))
|
||||
}
|
||||
|
||||
fn write_block_root_index_for_era<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
state: &BeaconState<E>,
|
||||
era_number: u64,
|
||||
) -> Result<(), String> {
|
||||
let end_slot = state.slot();
|
||||
let slots_per_historical_root = E::slots_per_historical_root() as u64;
|
||||
let expected_end_slot = Slot::new(era_number * slots_per_historical_root);
|
||||
if end_slot != expected_end_slot {
|
||||
return Err(format!(
|
||||
"era state slot mismatch: expected {expected_end_slot}, got {end_slot}"
|
||||
));
|
||||
}
|
||||
|
||||
let start_slot = end_slot.saturating_sub(slots_per_historical_root);
|
||||
|
||||
let ops = (start_slot.as_u64()..end_slot.as_u64())
|
||||
.map(|slot_u64| {
|
||||
let slot = Slot::new(slot_u64);
|
||||
let block_root = state
|
||||
.get_block_root(slot)
|
||||
.map_err(|error| format!("failed to read block root {slot}: {error:?}"))?;
|
||||
// TODO(era): Should we write BeaconBlockRoots for missed slots?
|
||||
Ok(KeyValueStoreOp::PutKeyValue(
|
||||
DBColumn::BeaconBlockRoots,
|
||||
slot_u64.to_be_bytes().to_vec(),
|
||||
block_root.as_slice().to_vec(),
|
||||
))
|
||||
})
|
||||
.collect::<Result<Vec<_>, String>>()?;
|
||||
|
||||
store
|
||||
.cold_db
|
||||
.do_atomically(ops)
|
||||
.map_err(|error| format!("failed to store block root index: {error:?}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_era_files(dir: &Path) -> Result<Vec<(u64, PathBuf)>, String> {
|
||||
let entries = fs::read_dir(dir).map_err(|error| format!("failed to read era dir: {error}"))?;
|
||||
let mut era_files = Vec::new();
|
||||
|
||||
for entry in entries {
|
||||
let entry = entry.map_err(|error| format!("failed to read era entry: {error}"))?;
|
||||
let path = entry.path();
|
||||
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if !file_name.ends_with(".era") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let Some((prefix, _hash_part)) = file_name.rsplit_once('-') else {
|
||||
continue;
|
||||
};
|
||||
let Some((_network_name, era_part)) = prefix.rsplit_once('-') else {
|
||||
continue;
|
||||
};
|
||||
let Some(era_number) = era_part.parse().ok() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
era_files.push((era_number, path));
|
||||
}
|
||||
|
||||
if era_files.is_empty() {
|
||||
warn!(?dir, "Era files directory is empty");
|
||||
}
|
||||
|
||||
Ok(era_files)
|
||||
}
|
||||
18
beacon_node/beacon_chain/src/era/mod.rs
Normal file
18
beacon_node/beacon_chain/src/era/mod.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
/// ERA file support for importing and exporting historical beacon chain data.
|
||||
///
|
||||
/// ERA files store beacon blocks and states in a standardized archive format, enabling
|
||||
/// efficient distribution of historical chain data between clients. Each ERA file covers
|
||||
/// one "era" of `SLOTS_PER_HISTORICAL_ROOT` slots (8192 on mainnet) and contains:
|
||||
/// - All beacon blocks in the slot range
|
||||
/// - The boundary `BeaconState` at the end of the range
|
||||
///
|
||||
/// Verification relies on `historical_roots` (pre-Capella) and `historical_summaries`
|
||||
/// (post-Capella) which commit to the block and state roots for each era.
|
||||
///
|
||||
/// Spec: <https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#historical-roots-updates>
|
||||
/// Format: <https://github.com/status-im/nimbus-eth2/blob/stable/docs/the_auditors_handbook/src/02.4_the_era_file_format.md>
|
||||
pub mod consumer;
|
||||
pub mod producer;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
332
beacon_node/beacon_chain/src/era/producer.rs
Normal file
332
beacon_node/beacon_chain/src/era/producer.rs
Normal file
@@ -0,0 +1,332 @@
|
||||
//! Export ERA files from a Lighthouse database.
|
||||
//!
|
||||
//! Reads blocks and states from the cold DB and writes them into the standardized ERA format.
|
||||
//! ERA files are produced either during historical reconstruction (backfilling) or upon
|
||||
//! finalization of new eras. Each file is named `{network}-{era_number}-{short_root}.era`.
|
||||
//!
|
||||
//! Files are written atomically (temp file + rename) so partial writes never appear as valid
|
||||
//! ERA files. Existing files are skipped, making production idempotent.
|
||||
|
||||
use rand::random;
|
||||
use reth_era::common::file_ops::{EraFileFormat, EraFileId, StreamWriter};
|
||||
use reth_era::era::file::{EraFile, EraWriter};
|
||||
use reth_era::era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock};
|
||||
use reth_era::era::types::group::{EraGroup, EraId, SlotIndex};
|
||||
use ssz::Encode;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::path::Path;
|
||||
use store::{HotColdDB, ItemStore};
|
||||
use tracing::{error, info};
|
||||
use tree_hash::TreeHash;
|
||||
use types::{BeaconState, EthSpec, Slot};
|
||||
|
||||
fn era_file_exists(dir: &Path, id: &EraId) -> bool {
|
||||
dir.join(id.to_file_name()).exists()
|
||||
}
|
||||
|
||||
fn era_file_exists_for_number(dir: &Path, network_name: &str, era_number: u64) -> bool {
|
||||
let prefix = format!("{}-{:05}-", network_name, era_number);
|
||||
let Ok(entries) = fs::read_dir(dir) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
for entry in entries.flatten() {
|
||||
let file_name = entry.file_name();
|
||||
let Some(name) = file_name.to_str() else {
|
||||
continue;
|
||||
};
|
||||
if name.starts_with(&prefix) && name.ends_with(".era") {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn maybe_produce_reconstruction_eras<
|
||||
E: EthSpec,
|
||||
Hot: ItemStore<E>,
|
||||
Cold: ItemStore<E>,
|
||||
>(
|
||||
db: &HotColdDB<E, Hot, Cold>,
|
||||
output_dir: &Path,
|
||||
) {
|
||||
let anchor = db.get_anchor_info();
|
||||
let max_era = anchor.state_lower_limit.as_u64() / E::slots_per_historical_root() as u64;
|
||||
|
||||
for era_number in 0..=max_era {
|
||||
if let Err(error) = create_era_file(db, era_number, output_dir) {
|
||||
error!(
|
||||
?error,
|
||||
era_number, "Era producer failed during reconstruction"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn maybe_produce_finalization_era<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
db: &HotColdDB<E, Hot, Cold>,
|
||||
output_dir: &Path,
|
||||
finalized_slot: Slot,
|
||||
) {
|
||||
// This is the oldest slot for which we have a state and blocks available
|
||||
let anchor_slot = db.get_anchor_info().anchor_slot;
|
||||
// And finalized_slot is the most recent for which we have finalized state and blocks available
|
||||
|
||||
// We can produce an era file for era_number if
|
||||
// - anchor_slot <= start_slot(era_number) AND
|
||||
// - finalized_slot >= end_slot(era_number)
|
||||
let slots_per_hr = E::slots_per_historical_root() as u64;
|
||||
let lowest_era_file = anchor_slot.as_u64() / slots_per_hr;
|
||||
let Some(max_era_file) = ((finalized_slot.as_u64() + 1) / slots_per_hr).checked_sub(1) else {
|
||||
return;
|
||||
};
|
||||
for era_number in lowest_era_file..=max_era_file {
|
||||
if let Err(error) = create_era_file(db, era_number, output_dir) {
|
||||
error!(
|
||||
?error,
|
||||
era_number, "Era producer failed during finalization"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_era_file<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
db: &HotColdDB<E, Hot, Cold>,
|
||||
era_number: u64,
|
||||
output_dir: &Path,
|
||||
) -> Result<(), String> {
|
||||
let network_name = db
|
||||
.spec
|
||||
.config_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
if era_file_exists_for_number(output_dir, &network_name, era_number) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let end_slot = Slot::new(era_number * E::slots_per_historical_root() as u64);
|
||||
|
||||
let mut state = db
|
||||
.load_cold_state_by_slot(end_slot)
|
||||
.map_err(|error| format!("failed to load era state: {error:?}"))?;
|
||||
|
||||
if state.slot() != end_slot {
|
||||
return Err(format!(
|
||||
"era state slot mismatch: expected {}, got {}",
|
||||
end_slot,
|
||||
state.slot()
|
||||
));
|
||||
}
|
||||
|
||||
let group = build_era_group(db, &mut state, era_number)?;
|
||||
let file_id = era_file_id::<E>(&network_name, era_number, &mut state)?;
|
||||
let file = EraFile::new(group, file_id);
|
||||
|
||||
fs::create_dir_all(output_dir)
|
||||
.map_err(|error| format!("failed to create era files dir: {error}"))?;
|
||||
|
||||
if era_file_exists(output_dir, file.id()) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
write_era_file_atomic(output_dir, &file)?;
|
||||
|
||||
info!(
|
||||
era_number,
|
||||
file = %file.id().to_file_name(),
|
||||
"Wrote era file"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build_era_group<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
db: &HotColdDB<E, Hot, Cold>,
|
||||
state: &mut BeaconState<E>,
|
||||
era_number: u64,
|
||||
) -> Result<EraGroup, String> {
|
||||
// Era file 0 goes from slot 0 to 0, genesis state only
|
||||
let start_slot =
|
||||
Slot::new(era_number.saturating_sub(1) * E::slots_per_historical_root() as u64);
|
||||
let end_slot = Slot::new(era_number * E::slots_per_historical_root() as u64);
|
||||
|
||||
let compressed_state = CompressedBeaconState::from_ssz(&state.as_ssz_bytes())
|
||||
.map_err(|error| format!("failed to compress state: {error:?}"))?;
|
||||
|
||||
// Each entry has an 8-byte header; the version record is header-only.
|
||||
let mut offset: i64 = 8;
|
||||
let mut blocks: Vec<CompressedSignedBeaconBlock> = Vec::new();
|
||||
let mut block_data_starts: Vec<(Slot, i64)> = Vec::new();
|
||||
|
||||
// The era file number 0 contains the genesis state and nothing else
|
||||
if era_number > 0 {
|
||||
for slot_u64 in start_slot.as_u64()..end_slot.as_u64() {
|
||||
let slot = Slot::new(slot_u64);
|
||||
let block_root = state
|
||||
.get_block_root(slot)
|
||||
.map_err(|error| format!("failed to read block root {slot}: {error:?}"))?;
|
||||
|
||||
// Skip duplicate blocks (same root as previous slot), but only within this ERA
|
||||
if slot_u64 > start_slot.as_u64()
|
||||
&& let Ok(prev_root) = state.get_block_root(Slot::new(slot_u64 - 1))
|
||||
&& prev_root == block_root
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let block = db
|
||||
.get_full_block(block_root)
|
||||
.map_err(|error| format!("failed to load block: {error:?}"))?
|
||||
.ok_or_else(|| format!("missing block for root {block_root:?}"))?;
|
||||
|
||||
let compressed = CompressedSignedBeaconBlock::from_ssz(&block.as_ssz_bytes())
|
||||
.map_err(|error| format!("failed to compress block: {error:?}"))?;
|
||||
|
||||
let data_len = compressed.data.len() as i64;
|
||||
let data_start = offset + 8;
|
||||
blocks.push(compressed);
|
||||
block_data_starts.push((slot, data_start));
|
||||
offset += 8 + data_len;
|
||||
}
|
||||
}
|
||||
|
||||
let state_data_len = compressed_state.data.len() as i64;
|
||||
// Data starts after the 8-byte header.
|
||||
let state_data_start = offset + 8;
|
||||
offset += 8 + state_data_len;
|
||||
|
||||
let block_index_start = offset;
|
||||
let slot_count = E::slots_per_historical_root();
|
||||
// SlotIndex layout: starting_slot (8) + offsets (slot_count * 8) + count (8).
|
||||
let block_index_len = 8 + slot_count as i64 * 8 + 8;
|
||||
if era_number > 0 {
|
||||
offset += 8 + block_index_len;
|
||||
}
|
||||
|
||||
let state_index_start = offset;
|
||||
// Offset is relative to the start of the slot index data (after the 8-byte header)
|
||||
let state_offset = state_data_start - (state_index_start + 8);
|
||||
let state_slot_index = SlotIndex::new(end_slot.as_u64(), vec![state_offset]);
|
||||
|
||||
let group = if era_number > 0 {
|
||||
let mut offsets = vec![0i64; slot_count];
|
||||
for (slot, data_start) in &block_data_starts {
|
||||
let slot_index = slot
|
||||
.as_u64()
|
||||
.checked_sub(start_slot.as_u64())
|
||||
.ok_or_else(|| "slot underflow while building block index".to_string())?
|
||||
as usize;
|
||||
offsets[slot_index] = *data_start - block_index_start;
|
||||
}
|
||||
let block_index = SlotIndex::new(start_slot.as_u64(), offsets);
|
||||
EraGroup::with_block_index(blocks, compressed_state, block_index, state_slot_index)
|
||||
} else {
|
||||
EraGroup::new(blocks, compressed_state, state_slot_index)
|
||||
};
|
||||
Ok(group)
|
||||
}
|
||||
|
||||
fn short_historical_root<E: EthSpec>(
|
||||
state: &mut BeaconState<E>,
|
||||
era_number: u64,
|
||||
) -> Result<[u8; 4], String> {
|
||||
let root = if era_number == 0 {
|
||||
state.genesis_validators_root()
|
||||
} else {
|
||||
let era_index = era_number
|
||||
.checked_sub(1)
|
||||
.ok_or_else(|| "era index underflow".to_string())?;
|
||||
let roots_len = state.historical_roots_mut().len();
|
||||
if era_index < roots_len as u64 {
|
||||
*state
|
||||
.historical_roots_mut()
|
||||
.get(era_index as usize)
|
||||
.ok_or_else(|| "historical root missing".to_string())?
|
||||
} else {
|
||||
let summary_index = era_index
|
||||
.checked_sub(roots_len as u64)
|
||||
.ok_or_else(|| "historical summary index underflow".to_string())?;
|
||||
let summaries = state
|
||||
.historical_summaries_mut()
|
||||
.map_err(|error| format!("failed to access historical summaries: {error:?}"))?;
|
||||
let summary = summaries
|
||||
.get(summary_index as usize)
|
||||
.ok_or_else(|| "historical summary missing".to_string())?;
|
||||
summary.tree_hash_root()
|
||||
}
|
||||
};
|
||||
|
||||
let mut short_hash = [0u8; 4];
|
||||
short_hash.copy_from_slice(&root.as_slice()[..4]);
|
||||
Ok(short_hash)
|
||||
}
|
||||
|
||||
/// Write an era file atomically using a temp file + rename.
|
||||
///
|
||||
/// If the process crashes mid-write, only the temp file is left behind; the final file is
|
||||
/// created via rename, so it is either complete or absent. The era file existence check only
|
||||
/// considers `.era` files, so partial temp files are ignored safely.
|
||||
fn write_era_file_atomic(output_dir: &Path, file: &EraFile) -> Result<(), String> {
|
||||
let filename = file.id().to_file_name();
|
||||
let final_path = output_dir.join(&filename);
|
||||
if final_path.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Create a unique temp file and write the full era contents into it.
|
||||
let tmp_name = format!("{filename}.tmp-{:016x}", random::<u64>());
|
||||
let tmp_path = output_dir.join(tmp_name);
|
||||
let mut file_handle = OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&tmp_path)
|
||||
.map_err(|error| format!("failed to create temp file: {error}"))?;
|
||||
{
|
||||
let mut writer = EraWriter::new(&mut file_handle);
|
||||
writer
|
||||
.write_file(file)
|
||||
.map_err(|error| format!("failed to write era file: {error:?}"))?;
|
||||
}
|
||||
file_handle
|
||||
.sync_all()
|
||||
.map_err(|error| format!("failed to fsync era temp file: {error}"))?;
|
||||
|
||||
// Atomically publish; if another writer won, clean up and exit.
|
||||
if let Err(error) = fs::rename(&tmp_path, &final_path) {
|
||||
if error.kind() == std::io::ErrorKind::AlreadyExists && final_path.exists() {
|
||||
let _ = fs::remove_file(&tmp_path);
|
||||
return Ok(());
|
||||
}
|
||||
return Err(format!("failed to rename era temp file: {error}"));
|
||||
}
|
||||
|
||||
// Best-effort directory sync to make the rename durable.
|
||||
if let Ok(dir_handle) = File::open(output_dir) {
|
||||
let _ = dir_handle.sync_all();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn era_file_id<E: EthSpec>(
|
||||
network_name: &str,
|
||||
era_number: u64,
|
||||
state: &mut BeaconState<E>,
|
||||
) -> Result<EraId, String> {
|
||||
// reth_era uses hardcoded SLOTS_PER_HISTORICAL_ROOT=8192 to compute era number from start_slot.
|
||||
// To get the correct filename era number, we pass era_number * 8192 as the start_slot.
|
||||
const RETH_SLOTS_PER_HISTORICAL_ROOT: u64 = 8192;
|
||||
let reth_start_slot = era_number * RETH_SLOTS_PER_HISTORICAL_ROOT;
|
||||
|
||||
let slot_count = if era_number == 0 {
|
||||
0
|
||||
} else {
|
||||
E::slots_per_historical_root() as u32
|
||||
};
|
||||
let short_hash = short_historical_root(state, era_number)?;
|
||||
Ok(EraId::new(network_name, reth_start_slot, slot_count).with_hash(short_hash))
|
||||
}
|
||||
388
beacon_node/beacon_chain/src/era/tests.rs
Normal file
388
beacon_node/beacon_chain/src/era/tests.rs
Normal file
@@ -0,0 +1,388 @@
|
||||
/// ERA file consumer + producer tests using minimal preset test vectors.
|
||||
///
|
||||
/// Test vectors: 13 ERA files from a Nimbus minimal testnet.
|
||||
/// - Electra from genesis, Fulu at epoch 100000
|
||||
/// - SLOTS_PER_HISTORICAL_ROOT = 64 (one ERA = 64 slots = 8 epochs)
|
||||
/// - 13 ERA files covering 832 slots, 767 blocks, 1024 validators
|
||||
///
|
||||
/// All subtests run from a single #[test] to avoid nextest download races
|
||||
/// (same pattern as slashing_protection/tests/interop.rs).
|
||||
use super::consumer::EraFileDir;
|
||||
use reth_era::common::file_ops::StreamReader;
|
||||
use serde::Deserialize;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, LazyLock};
|
||||
use store::{DBColumn, HotColdDB, KeyValueStore, StoreConfig};
|
||||
use types::{BeaconState, ChainSpec, Config, EthSpec, Hash256, MinimalEthSpec, Slot};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Metadata {
|
||||
head_slot: u64,
|
||||
head_root: String,
|
||||
era_count: u64,
|
||||
}
|
||||
|
||||
static TEST_VECTORS_DIR: LazyLock<PathBuf> = LazyLock::new(|| {
|
||||
let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("tests")
|
||||
.join("era_test_vectors");
|
||||
let make_output = std::process::Command::new("make")
|
||||
.current_dir(&dir)
|
||||
.output()
|
||||
.expect("need `make` to download ERA test vectors");
|
||||
if !make_output.status.success() {
|
||||
eprintln!("{}", String::from_utf8_lossy(&make_output.stderr));
|
||||
panic!("Running `make` for ERA test vectors failed, see above");
|
||||
}
|
||||
dir.join("vectors")
|
||||
});
|
||||
|
||||
fn test_vectors_dir() -> &'static PathBuf {
|
||||
&TEST_VECTORS_DIR
|
||||
}
|
||||
|
||||
fn load_test_spec() -> ChainSpec {
|
||||
let config_str =
|
||||
std::fs::read_to_string(test_vectors_dir().join("config.yaml")).expect("read config.yaml");
|
||||
let config: Config = serde_yaml::from_str(&config_str).expect("parse config");
|
||||
config
|
||||
.apply_to_chain_spec::<MinimalEthSpec>(&ChainSpec::minimal())
|
||||
.expect("apply config")
|
||||
}
|
||||
|
||||
fn load_genesis_state(spec: &ChainSpec) -> BeaconState<MinimalEthSpec> {
|
||||
let era_dir = test_vectors_dir().join("era");
|
||||
let era0_path = std::fs::read_dir(&era_dir)
|
||||
.expect("read era dir")
|
||||
.filter_map(|e| e.ok())
|
||||
.find(|e| e.file_name().to_string_lossy().contains("-00000-"))
|
||||
.expect("ERA 0 file must exist");
|
||||
let file = std::fs::File::open(era0_path.path()).expect("open ERA 0");
|
||||
let era = reth_era::era::file::EraReader::new(file)
|
||||
.read_and_assemble("minimal".to_string())
|
||||
.expect("parse ERA 0");
|
||||
let state_bytes = era
|
||||
.group
|
||||
.era_state
|
||||
.decompress()
|
||||
.expect("decompress ERA 0 state");
|
||||
BeaconState::from_ssz_bytes(&state_bytes, spec).expect("decode genesis state from ERA 0")
|
||||
}
|
||||
|
||||
type TestStore = HotColdDB<
|
||||
MinimalEthSpec,
|
||||
store::MemoryStore<MinimalEthSpec>,
|
||||
store::MemoryStore<MinimalEthSpec>,
|
||||
>;
|
||||
|
||||
fn import_all_era_files() -> (TestStore, ChainSpec, u64) {
|
||||
let spec = load_test_spec();
|
||||
let era_dir_path = test_vectors_dir().join("era");
|
||||
let era_dir = EraFileDir::new::<MinimalEthSpec>(&era_dir_path, &spec).expect("open ERA dir");
|
||||
let max_era = era_dir.max_era();
|
||||
|
||||
let store = HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone()))
|
||||
.expect("create store");
|
||||
|
||||
let mut genesis_state = load_genesis_state(&spec);
|
||||
era_dir
|
||||
.import_all(&store, &mut genesis_state, &spec)
|
||||
.expect("import all ERA files");
|
||||
|
||||
(store, spec, max_era)
|
||||
}
|
||||
|
||||
fn empty_store(spec: &ChainSpec) -> TestStore {
|
||||
let store =
|
||||
HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())).expect("store");
|
||||
let mut genesis = load_genesis_state(spec);
|
||||
let root = genesis.canonical_root().expect("hash");
|
||||
let mut ops = vec![];
|
||||
store
|
||||
.store_cold_state(&root, &genesis, &mut ops)
|
||||
.expect("ops");
|
||||
store.cold_db.do_atomically(ops).expect("write");
|
||||
store
|
||||
}
|
||||
|
||||
fn era_dir_with_corrupt(corrupt_file: &str, target_pattern: &str) -> tempfile::TempDir {
|
||||
let tmp = tempfile::TempDir::new().expect("tmp");
|
||||
let src = test_vectors_dir();
|
||||
let dst = tmp.path().join("era");
|
||||
std::fs::create_dir_all(&dst).expect("mkdir");
|
||||
|
||||
for entry in std::fs::read_dir(src.join("era")).expect("readdir") {
|
||||
let entry = entry.expect("entry");
|
||||
let name = entry.file_name().to_string_lossy().to_string();
|
||||
if name.contains(target_pattern) {
|
||||
std::fs::copy(src.join("corrupt").join(corrupt_file), dst.join(&name))
|
||||
.expect("copy corrupt");
|
||||
} else {
|
||||
std::fs::copy(entry.path(), dst.join(&name)).expect("copy");
|
||||
}
|
||||
}
|
||||
tmp
|
||||
}
|
||||
|
||||
fn assert_import_fails(
|
||||
corrupt_file: &str,
|
||||
target_pattern: &str,
|
||||
target_era: u64,
|
||||
expected_err: &str,
|
||||
) {
|
||||
let tmp = era_dir_with_corrupt(corrupt_file, target_pattern);
|
||||
let spec = load_test_spec();
|
||||
let era_dir = EraFileDir::new::<MinimalEthSpec>(&tmp.path().join("era"), &spec)
|
||||
.expect("init should succeed");
|
||||
let store = empty_store(&spec);
|
||||
|
||||
for era in 0..target_era {
|
||||
era_dir
|
||||
.import_era_file(&store, era, &spec, None)
|
||||
.unwrap_or_else(|e| panic!("ERA {era}: {e}"));
|
||||
}
|
||||
|
||||
let err = era_dir
|
||||
.import_era_file(&store, target_era, &spec, None)
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
err.contains(expected_err),
|
||||
"expected \"{expected_err}\", got: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
fn era3_correct_root_and_slot(spec: &ChainSpec) -> (Hash256, types::Slot) {
|
||||
let era3_file = std::fs::read_dir(test_vectors_dir().join("era"))
|
||||
.expect("readdir")
|
||||
.filter_map(|e| e.ok())
|
||||
.find(|e| e.file_name().to_string_lossy().contains("-00003-"))
|
||||
.expect("ERA 3");
|
||||
let file = std::fs::File::open(era3_file.path()).expect("open");
|
||||
let era = reth_era::era::file::EraReader::new(file)
|
||||
.read_and_assemble("minimal".to_string())
|
||||
.expect("parse");
|
||||
let state_bytes = era.group.era_state.decompress().expect("decompress");
|
||||
let mut state: BeaconState<MinimalEthSpec> =
|
||||
BeaconState::from_ssz_bytes(&state_bytes, spec).expect("decode");
|
||||
let root = state.canonical_root().expect("root");
|
||||
let slot = state.slot();
|
||||
(root, slot)
|
||||
}
|
||||
|
||||
// Single #[test] to avoid nextest parallel download races.
|
||||
// See slashing_protection/tests/interop.rs for the same pattern.
|
||||
#[test]
|
||||
fn era_test_vectors() {
|
||||
consumer_imports_and_verifies();
|
||||
producer_output_is_byte_identical();
|
||||
rejects_corrupted_block_decompression();
|
||||
rejects_corrupted_genesis_state();
|
||||
rejects_corrupted_middle_state();
|
||||
rejects_corrupted_reference_state();
|
||||
rejects_wrong_era_content();
|
||||
rejects_wrong_era_root();
|
||||
rejects_corrupt_block_summary();
|
||||
rejects_wrong_block_root();
|
||||
rejects_mutated_state_with_trusted_root();
|
||||
rejects_wrong_trusted_state_root();
|
||||
}
|
||||
|
||||
fn load_metadata() -> Metadata {
|
||||
let path = test_vectors_dir().join("metadata.json");
|
||||
let data = std::fs::read_to_string(&path).expect("read metadata.json");
|
||||
serde_json::from_str(&data).expect("parse metadata.json")
|
||||
}
|
||||
|
||||
fn consumer_imports_and_verifies() {
|
||||
let metadata = load_metadata();
|
||||
let (store, _spec, max_era) = import_all_era_files();
|
||||
let slots_per_era = MinimalEthSpec::slots_per_historical_root() as u64;
|
||||
|
||||
assert_eq!(max_era + 1, metadata.era_count, "era count mismatch");
|
||||
|
||||
// The last indexed slot is (max_era * slots_per_era - 1), since the ERA boundary
|
||||
// state covers [start_slot, end_slot) where end_slot = era_number * slots_per_era.
|
||||
let last_slot = max_era * slots_per_era - 1;
|
||||
|
||||
// Verify head block root matches metadata
|
||||
let head_key = metadata.head_slot.to_be_bytes().to_vec();
|
||||
let head_root_bytes = store
|
||||
.cold_db
|
||||
.get_bytes(DBColumn::BeaconBlockRoots, &head_key)
|
||||
.expect("read head root index")
|
||||
.unwrap_or_else(|| panic!("no block root at head slot {}", metadata.head_slot));
|
||||
let head_root = Hash256::from_slice(&head_root_bytes);
|
||||
|
||||
let expected_head_root_bytes = hex::decode(&metadata.head_root).expect("decode head_root hex");
|
||||
let expected_head_root = Hash256::from_slice(&expected_head_root_bytes);
|
||||
assert_eq!(
|
||||
head_root, expected_head_root,
|
||||
"head root mismatch at slot {}: got {head_root:?}",
|
||||
metadata.head_slot
|
||||
);
|
||||
|
||||
// Verify the head block exists and has the correct root
|
||||
let head_block = store
|
||||
.get_full_block(&head_root)
|
||||
.expect("query head block")
|
||||
.unwrap_or_else(|| panic!("head block missing at slot {}", metadata.head_slot));
|
||||
assert_eq!(head_block.canonical_root(), head_root);
|
||||
assert_eq!(
|
||||
head_block.slot(),
|
||||
Slot::new(metadata.head_slot),
|
||||
"last indexed slot is {last_slot}"
|
||||
);
|
||||
}
|
||||
|
||||
fn producer_output_is_byte_identical() {
|
||||
let (store, _spec, max_era) = import_all_era_files();
|
||||
let output = PathBuf::from("/tmp/era_producer_test_output");
|
||||
let _ = std::fs::remove_dir_all(&output);
|
||||
std::fs::create_dir_all(&output).expect("mkdir");
|
||||
|
||||
for era in 0..=max_era {
|
||||
super::producer::create_era_file(&store, era, &output)
|
||||
.unwrap_or_else(|e| panic!("produce ERA {era}: {e}"));
|
||||
}
|
||||
|
||||
let mut originals: Vec<_> = std::fs::read_dir(test_vectors_dir().join("era"))
|
||||
.expect("readdir")
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.file_name().to_string_lossy().ends_with(".era"))
|
||||
.collect();
|
||||
originals.sort_by_key(|e| e.file_name());
|
||||
|
||||
let mut produced: Vec<_> = std::fs::read_dir(&output)
|
||||
.expect("readdir")
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.file_name().to_string_lossy().ends_with(".era"))
|
||||
.collect();
|
||||
produced.sort_by_key(|e| e.file_name());
|
||||
|
||||
assert_eq!(originals.len(), produced.len(), "file count mismatch");
|
||||
|
||||
for (orig, prod) in originals.iter().zip(produced.iter()) {
|
||||
assert_eq!(
|
||||
std::fs::read(orig.path()).expect("read"),
|
||||
std::fs::read(prod.path()).expect("read"),
|
||||
"ERA mismatch: {:?}",
|
||||
orig.file_name()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn rejects_corrupted_block_decompression() {
|
||||
assert_import_fails("era1-corrupt-block.era", "-00001-", 1, "decompress");
|
||||
}
|
||||
|
||||
fn rejects_corrupted_genesis_state() {
|
||||
assert_import_fails("era0-corrupt-state.era", "-00000-", 0, "decompress");
|
||||
}
|
||||
|
||||
fn rejects_corrupted_middle_state() {
|
||||
assert_import_fails("era5-corrupt-state.era", "-00005-", 5, "decompress");
|
||||
}
|
||||
|
||||
fn rejects_corrupted_reference_state() {
|
||||
let tmp = era_dir_with_corrupt("era12-corrupt-state.era", "-00012-");
|
||||
let spec = load_test_spec();
|
||||
match EraFileDir::new::<MinimalEthSpec>(&tmp.path().join("era"), &spec) {
|
||||
Ok(_) => panic!("should fail with corrupted reference state"),
|
||||
Err(err) => assert!(err.contains("decompress"), "expected decompress: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn rejects_wrong_era_content() {
|
||||
assert_import_fails(
|
||||
"era3-wrong-content.era",
|
||||
"-00003-",
|
||||
3,
|
||||
"era state slot mismatch",
|
||||
);
|
||||
}
|
||||
|
||||
fn rejects_wrong_era_root() {
|
||||
assert_import_fails("era0-wrong-root.era", "-00000-", 0, "era root mismatch");
|
||||
}
|
||||
|
||||
fn rejects_corrupt_block_summary() {
|
||||
assert_import_fails(
|
||||
"era8-corrupt-block-summary.era",
|
||||
"-00008-",
|
||||
8,
|
||||
"block summary root post-capella mismatch",
|
||||
);
|
||||
}
|
||||
|
||||
fn rejects_wrong_block_root() {
|
||||
assert_import_fails(
|
||||
"era2-wrong-block-root.era",
|
||||
"-00002-",
|
||||
2,
|
||||
"block root mismatch",
|
||||
);
|
||||
}
|
||||
|
||||
fn rejects_mutated_state_with_trusted_root() {
|
||||
let tmp = era_dir_with_corrupt("era3-wrong-state-root.era", "-00003-");
|
||||
let spec = load_test_spec();
|
||||
let era_dir = EraFileDir::new::<MinimalEthSpec>(&tmp.path().join("era"), &spec)
|
||||
.expect("init should succeed");
|
||||
let store = empty_store(&spec);
|
||||
|
||||
for era in 0..3 {
|
||||
era_dir
|
||||
.import_era_file(&store, era, &spec, None)
|
||||
.unwrap_or_else(|e| panic!("ERA {era}: {e}"));
|
||||
}
|
||||
|
||||
let (correct_root, slot) = era3_correct_root_and_slot(&spec);
|
||||
|
||||
let err = era_dir
|
||||
.import_era_file(&store, 3, &spec, Some((correct_root, slot)))
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
err.contains("trusted state root mismatch"),
|
||||
"expected trusted state root mismatch: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
fn rejects_wrong_trusted_state_root() {
|
||||
let spec = load_test_spec();
|
||||
let store = empty_store(&spec);
|
||||
let era_dir_path = test_vectors_dir().join("era");
|
||||
let era_dir = EraFileDir::new::<MinimalEthSpec>(&era_dir_path, &spec).expect("open");
|
||||
|
||||
for era in 0..=2 {
|
||||
era_dir
|
||||
.import_era_file(&store, era, &spec, None)
|
||||
.unwrap_or_else(|e| panic!("ERA {era}: {e}"));
|
||||
}
|
||||
|
||||
let (correct_root, slot) = era3_correct_root_and_slot(&spec);
|
||||
|
||||
era_dir
|
||||
.import_era_file(&store, 3, &spec, Some((correct_root, slot)))
|
||||
.expect("correct root should pass");
|
||||
|
||||
let wrong_root = {
|
||||
let mut bytes: [u8; 32] = correct_root.into();
|
||||
bytes[0] ^= 0x01;
|
||||
Hash256::from(bytes)
|
||||
};
|
||||
|
||||
let store2 = empty_store(&spec);
|
||||
for era in 0..=2 {
|
||||
era_dir
|
||||
.import_era_file(&store2, era, &spec, None)
|
||||
.unwrap_or_else(|e| panic!("ERA {era}: {e}"));
|
||||
}
|
||||
|
||||
let err = era_dir
|
||||
.import_era_file(&store2, 3, &spec, Some((wrong_root, slot)))
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
err.contains("trusted state root mismatch"),
|
||||
"expected trusted state root mismatch: {err}"
|
||||
);
|
||||
}
|
||||
@@ -21,6 +21,7 @@ pub mod custody_context;
|
||||
pub mod data_availability_checker;
|
||||
pub mod data_column_verification;
|
||||
mod early_attester_cache;
|
||||
pub mod era;
|
||||
mod errors;
|
||||
pub mod events;
|
||||
pub mod execution_payload;
|
||||
|
||||
2
beacon_node/beacon_chain/tests/era_test_vectors/.gitignore
vendored
Normal file
2
beacon_node/beacon_chain/tests/era_test_vectors/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
vectors
|
||||
*.tar.gz
|
||||
17
beacon_node/beacon_chain/tests/era_test_vectors/Makefile
Normal file
17
beacon_node/beacon_chain/tests/era_test_vectors/Makefile
Normal file
@@ -0,0 +1,17 @@
|
||||
VECTORS_TAG := 52eb9dd94a09153b8b07c9bba4b08adca0d6e219
|
||||
OUTPUT_DIR := vectors
|
||||
TARBALL := $(OUTPUT_DIR)-$(VECTORS_TAG).tar.gz
|
||||
ARCHIVE_URL := https://github.com/dapplion/era-test-vectors/tarball/$(VECTORS_TAG)
|
||||
|
||||
$(OUTPUT_DIR): $(TARBALL)
|
||||
rm -rf $@
|
||||
mkdir -p $@
|
||||
tar --strip-components=1 -xzf $^ -C $@
|
||||
|
||||
$(TARBALL):
|
||||
curl --fail -L -o $@ $(ARCHIVE_URL)
|
||||
|
||||
clean:
|
||||
rm -rf $(OUTPUT_DIR) $(TARBALL)
|
||||
|
||||
.PHONY: clean
|
||||
@@ -2076,6 +2076,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Store a pre-finalization state in the freezer database, applying ops atomically.
|
||||
pub fn put_cold_state(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
state: &BeaconState<E>,
|
||||
) -> Result<(), Error> {
|
||||
let mut ops: Vec<KeyValueStoreOp> = Vec::new();
|
||||
self.store_cold_state(state_root, state, &mut ops)?;
|
||||
self.cold_db.do_atomically(ops)
|
||||
}
|
||||
|
||||
/// Store a pre-finalization state in the freezer database.
|
||||
pub fn store_cold_state(
|
||||
&self,
|
||||
|
||||
@@ -47,4 +47,8 @@ impl HistoricalSummary {
|
||||
state_summary_root: state.state_roots().tree_hash_root(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn block_summary_root(&self) -> Hash256 {
|
||||
self.block_summary_root
|
||||
}
|
||||
}
|
||||
|
||||
79
lcli/src/consume_era_files.rs
Normal file
79
lcli/src/consume_era_files.rs
Normal file
@@ -0,0 +1,79 @@
|
||||
use beacon_chain::era::consumer::EraFileDir;
|
||||
use clap::ArgMatches;
|
||||
use clap_utils::parse_required;
|
||||
use environment::Environment;
|
||||
use eth2_network_config::Eth2NetworkConfig;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use store::database::interface::BeaconNodeBackend;
|
||||
use store::{HotColdDB, StoreConfig};
|
||||
use tracing::info;
|
||||
use types::EthSpec;
|
||||
|
||||
fn is_dir_non_empty(path: &PathBuf) -> bool {
|
||||
path.exists()
|
||||
&& std::fs::read_dir(path)
|
||||
.map(|mut entries| entries.next().is_some())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
pub fn run<E: EthSpec>(
|
||||
env: Environment<E>,
|
||||
network_config: Eth2NetworkConfig,
|
||||
matches: &ArgMatches,
|
||||
) -> Result<(), String> {
|
||||
let datadir: PathBuf = parse_required(matches, "datadir")?;
|
||||
let era_dir: PathBuf = parse_required(matches, "era-dir")?;
|
||||
|
||||
let hot_path = datadir.join("chain_db");
|
||||
let cold_path = datadir.join("freezer_db");
|
||||
let blobs_path = datadir.join("blobs_db");
|
||||
|
||||
// Fail fast if database directories already contain data
|
||||
if is_dir_non_empty(&hot_path) || is_dir_non_empty(&cold_path) {
|
||||
return Err(format!(
|
||||
"Database directories are not empty: {} / {}. \
|
||||
This command expects a fresh datadir.",
|
||||
hot_path.display(),
|
||||
cold_path.display(),
|
||||
));
|
||||
}
|
||||
|
||||
let spec = env.eth2_config.spec.clone();
|
||||
|
||||
info!(
|
||||
hot_path = %hot_path.display(),
|
||||
cold_path = %cold_path.display(),
|
||||
era_dir = %era_dir.display(),
|
||||
"Opening database"
|
||||
);
|
||||
|
||||
std::fs::create_dir_all(&hot_path).map_err(|e| format!("Failed to create hot db dir: {e}"))?;
|
||||
std::fs::create_dir_all(&cold_path)
|
||||
.map_err(|e| format!("Failed to create cold db dir: {e}"))?;
|
||||
std::fs::create_dir_all(&blobs_path)
|
||||
.map_err(|e| format!("Failed to create blobs db dir: {e}"))?;
|
||||
|
||||
let db = HotColdDB::<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>::open(
|
||||
&hot_path,
|
||||
&cold_path,
|
||||
&blobs_path,
|
||||
|_, _, _| Ok(()),
|
||||
StoreConfig::default(),
|
||||
spec.clone(),
|
||||
)
|
||||
.map_err(|e| format!("Failed to open database: {e:?}"))?;
|
||||
|
||||
let mut genesis_state = env
|
||||
.runtime()
|
||||
.block_on(network_config.genesis_state::<E>(None, Duration::from_secs(120)))
|
||||
.map_err(|e| format!("Failed to load genesis state: {e}"))?
|
||||
.ok_or("No genesis state available for this network")?;
|
||||
|
||||
let era_file_dir = EraFileDir::new::<E>(&era_dir, &spec)
|
||||
.map_err(|e| format!("Failed to open ERA dir: {e}"))?;
|
||||
|
||||
era_file_dir.import_all(&db, &mut genesis_state, &spec)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,11 +1,13 @@
|
||||
mod block_root;
|
||||
mod check_deposit_data;
|
||||
mod consume_era_files;
|
||||
mod generate_bootnode_enr;
|
||||
mod http_sync;
|
||||
mod indexed_attestations;
|
||||
mod mnemonic_validators;
|
||||
mod mock_el;
|
||||
mod parse_ssz;
|
||||
mod produce_era_files;
|
||||
mod skip_slots;
|
||||
mod state_root;
|
||||
mod transition_blocks;
|
||||
@@ -571,6 +573,50 @@ fn main() {
|
||||
.display_order(0)
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("consume-era-files")
|
||||
.about("Import ERA files into an empty database, producing a ready-to-use beacon node DB.")
|
||||
.arg(
|
||||
Arg::new("datadir")
|
||||
.long("datadir")
|
||||
.value_name("PATH")
|
||||
.action(ArgAction::Set)
|
||||
.required(true)
|
||||
.help("Path to the beacon node data directory (will create chain_db, freezer_db, blobs_db inside).")
|
||||
.display_order(0)
|
||||
)
|
||||
.arg(
|
||||
Arg::new("era-dir")
|
||||
.long("era-dir")
|
||||
.value_name("PATH")
|
||||
.action(ArgAction::Set)
|
||||
.required(true)
|
||||
.help("Directory containing ERA files to import.")
|
||||
.display_order(0)
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("produce-era-files")
|
||||
.about("Produce ERA files from a fully reconstructed beacon node database.")
|
||||
.arg(
|
||||
Arg::new("datadir")
|
||||
.long("datadir")
|
||||
.value_name("PATH")
|
||||
.action(ArgAction::Set)
|
||||
.required(true)
|
||||
.help("Path to the beacon node data directory (containing chain_db, freezer_db, blobs_db).")
|
||||
.display_order(0)
|
||||
)
|
||||
.arg(
|
||||
Arg::new("output-dir")
|
||||
.long("output-dir")
|
||||
.value_name("PATH")
|
||||
.action(ArgAction::Set)
|
||||
.required(true)
|
||||
.help("Directory to write ERA files to. Created if it does not exist.")
|
||||
.display_order(0)
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("http-sync")
|
||||
.about("Manual sync")
|
||||
@@ -765,6 +811,13 @@ fn run<E: EthSpec>(env_builder: EnvironmentBuilder<E>, matches: &ArgMatches) ->
|
||||
}
|
||||
Some(("mock-el", matches)) => mock_el::run::<E>(env, matches)
|
||||
.map_err(|e| format!("Failed to run mock-el command: {}", e)),
|
||||
Some(("consume-era-files", matches)) => {
|
||||
let network_config = get_network_config()?;
|
||||
consume_era_files::run::<E>(env, network_config, matches)
|
||||
.map_err(|e| format!("Failed to consume ERA files: {}", e))
|
||||
}
|
||||
Some(("produce-era-files", matches)) => produce_era_files::run::<E>(env, matches)
|
||||
.map_err(|e| format!("Failed to produce ERA files: {}", e)),
|
||||
Some(("http-sync", matches)) => {
|
||||
let network_config = get_network_config()?;
|
||||
http_sync::run::<E>(env, network_config, matches)
|
||||
|
||||
90
lcli/src/produce_era_files.rs
Normal file
90
lcli/src/produce_era_files.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
use beacon_chain::era::producer;
|
||||
use clap::ArgMatches;
|
||||
use clap_utils::parse_required;
|
||||
use environment::Environment;
|
||||
use std::path::PathBuf;
|
||||
use store::database::interface::BeaconNodeBackend;
|
||||
use store::{HotColdDB, StoreConfig};
|
||||
use tracing::info;
|
||||
use types::EthSpec;
|
||||
|
||||
pub fn run<E: EthSpec>(env: Environment<E>, matches: &ArgMatches) -> Result<(), String> {
|
||||
let datadir: PathBuf = parse_required(matches, "datadir")?;
|
||||
let output_dir: PathBuf = parse_required(matches, "output-dir")?;
|
||||
|
||||
let hot_path = datadir.join("chain_db");
|
||||
let cold_path = datadir.join("freezer_db");
|
||||
let blobs_path = datadir.join("blobs_db");
|
||||
|
||||
let spec = env.eth2_config.spec.clone();
|
||||
|
||||
info!(
|
||||
hot_path = %hot_path.display(),
|
||||
cold_path = %cold_path.display(),
|
||||
output_dir = %output_dir.display(),
|
||||
"Opening database"
|
||||
);
|
||||
|
||||
let db = HotColdDB::<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>::open(
|
||||
&hot_path,
|
||||
&cold_path,
|
||||
&blobs_path,
|
||||
|_, _, _| Ok(()),
|
||||
StoreConfig::default(),
|
||||
spec,
|
||||
)
|
||||
.map_err(|e| format!("Failed to open database: {e:?}"))?;
|
||||
|
||||
let anchor = db.get_anchor_info();
|
||||
let split = db.get_split_info();
|
||||
|
||||
info!(
|
||||
anchor_slot = %anchor.anchor_slot,
|
||||
state_lower_limit = %anchor.state_lower_limit,
|
||||
state_upper_limit = %anchor.state_upper_limit,
|
||||
oldest_block_slot = %anchor.oldest_block_slot,
|
||||
split_slot = %split.slot,
|
||||
"Database info"
|
||||
);
|
||||
|
||||
// Verify reconstruction is complete: state_lower_limit should equal state_upper_limit
|
||||
if !anchor.all_historic_states_stored() {
|
||||
return Err(format!(
|
||||
"State reconstruction is not complete. \
|
||||
state_lower_limit={}, state_upper_limit={}. \
|
||||
Run with --reconstruct-historic-states first.",
|
||||
anchor.state_lower_limit, anchor.state_upper_limit,
|
||||
));
|
||||
}
|
||||
|
||||
// Verify block backfill is complete
|
||||
if anchor.oldest_block_slot > 0 {
|
||||
return Err(format!(
|
||||
"Block backfill is not complete. oldest_block_slot={}. \
|
||||
Complete backfill sync first.",
|
||||
anchor.oldest_block_slot,
|
||||
));
|
||||
}
|
||||
|
||||
let slots_per_historical_root = E::slots_per_historical_root() as u64;
|
||||
// An ERA can only be created if its end slot <= split slot (finalized boundary)
|
||||
let max_era = split.slot.as_u64() / slots_per_historical_root;
|
||||
|
||||
info!(max_era, "Producing ERA files from 0 to max_era");
|
||||
|
||||
std::fs::create_dir_all(&output_dir)
|
||||
.map_err(|e| format!("Failed to create output directory: {e}"))?;
|
||||
|
||||
for era_number in 0..=max_era {
|
||||
producer::create_era_file(&db, era_number, &output_dir)
|
||||
.map_err(|e| format!("Failed to produce ERA file {era_number}: {e}"))?;
|
||||
|
||||
if (era_number + 1) % 100 == 0 || era_number == max_era {
|
||||
info!(era_number, max_era, "Progress");
|
||||
}
|
||||
}
|
||||
|
||||
info!(max_era, "ERA file production complete");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user