Compare commits

...

3 Commits

Author SHA1 Message Date
dapplion
676ad5dbba Add module-level documentation to ERA consumer and producer 2026-03-08 19:45:34 -05:00
dapplion
f368a9a31e Move ERA test vectors to external repo, download at test time
Test vectors are now hosted at dapplion/era-test-vectors and downloaded
via Makefile (same pattern as slashing_protection interchange tests).
2026-03-08 19:42:06 -05:00
dapplion
6cc3d63c8b Implement ERA consumer and producer in lcli 2026-03-08 18:52:27 -05:00
15 changed files with 1625 additions and 41 deletions

161
Cargo.lock generated
View File

@@ -1232,8 +1232,8 @@ dependencies = [
"eth2_network_config",
"ethereum_hashing",
"ethereum_serde_utils",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"execution_layer",
"fixed_bytes",
"fork_choice",
@@ -1259,10 +1259,12 @@ dependencies = [
"proto_array",
"rand 0.9.2",
"rayon",
"reth-era",
"safe_arith",
"sensitive_url",
"serde",
"serde_json",
"serde_yaml",
"slasher",
"slot_clock",
"smallvec",
@@ -1508,7 +1510,7 @@ dependencies = [
"blst",
"ethereum_hashing",
"ethereum_serde_utils",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"fixed_bytes",
"hex",
"rand 0.9.2",
@@ -1555,7 +1557,7 @@ dependencies = [
"clap",
"clap_utils",
"eth2_network_config",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"hex",
"lighthouse_network",
"log",
@@ -1613,7 +1615,7 @@ dependencies = [
"bls",
"context_deserialize",
"eth2",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"lighthouse_version",
"mockito",
"reqwest",
@@ -1881,7 +1883,7 @@ dependencies = [
"clap",
"dirs",
"eth2_network_config",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"hex",
"serde",
"serde_json",
@@ -1900,7 +1902,7 @@ dependencies = [
"environment",
"eth2",
"eth2_config",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"execution_layer",
"futures",
"genesis",
@@ -2334,6 +2336,16 @@ dependencies = [
"syn 2.0.111",
]
[[package]]
name = "darling"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
dependencies = [
"darling_core 0.20.11",
"darling_macro 0.20.11",
]
[[package]]
name = "darling"
version = "0.21.3"
@@ -2354,6 +2366,20 @@ dependencies = [
"darling_macro 0.23.0",
]
[[package]]
name = "darling_core"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn 2.0.111",
]
[[package]]
name = "darling_core"
version = "0.21.3"
@@ -2382,6 +2408,17 @@ dependencies = [
"syn 2.0.111",
]
[[package]]
name = "darling_macro"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core 0.20.11",
"quote",
"syn 2.0.111",
]
[[package]]
name = "darling_macro"
version = "0.21.3"
@@ -2506,7 +2543,7 @@ dependencies = [
"alloy-json-abi",
"alloy-primitives",
"bls",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"hex",
"reqwest",
"serde_json",
@@ -2842,8 +2879,8 @@ dependencies = [
"context_deserialize",
"educe",
"eth2_network_config",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"execution_layer",
"fork_choice",
"fs2",
@@ -3123,8 +3160,8 @@ dependencies = [
"enr",
"eth2_keystore",
"ethereum_serde_utils",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"futures",
"futures-util",
"libp2p-identity",
@@ -3210,7 +3247,7 @@ dependencies = [
"bytes",
"discv5",
"eth2_config",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"fixed_bytes",
"kzg",
"pretty_reqwest_error",
@@ -3275,6 +3312,21 @@ dependencies = [
"serde_json",
]
[[package]]
name = "ethereum_ssz"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dcddb2554d19cde19b099fadddde576929d7a4d0c1cd3512d1fd95cf174375c"
dependencies = [
"alloy-primitives",
"ethereum_serde_utils",
"itertools 0.13.0",
"serde",
"serde_derive",
"smallvec",
"typenum",
]
[[package]]
name = "ethereum_ssz"
version = "0.10.1"
@@ -3292,6 +3344,18 @@ dependencies = [
"typenum",
]
[[package]]
name = "ethereum_ssz_derive"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a657b6b3b7e153637dc6bdc6566ad9279d9ee11a15b12cfb24a2e04360637e9f"
dependencies = [
"darling 0.20.11",
"proc-macro2",
"quote",
"syn 2.0.111",
]
[[package]]
name = "ethereum_ssz_derive"
version = "0.10.1"
@@ -3385,7 +3449,7 @@ dependencies = [
"bytes",
"eth2",
"ethereum_serde_utils",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"fixed_bytes",
"fork_choice",
"hash-db",
@@ -3583,8 +3647,8 @@ name = "fork_choice"
version = "0.1.0"
dependencies = [
"beacon_chain",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"fixed_bytes",
"logging",
"metrics",
@@ -3778,7 +3842,7 @@ version = "0.2.0"
dependencies = [
"bls",
"ethereum_hashing",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"int_to_bytes",
"merkle_proof",
"rayon",
@@ -4198,7 +4262,7 @@ dependencies = [
"either",
"eth2",
"ethereum_serde_utils",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"execution_layer",
"fixed_bytes",
"futures",
@@ -4839,8 +4903,8 @@ dependencies = [
"educe",
"ethereum_hashing",
"ethereum_serde_utils",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"hex",
"rayon",
"rust_eth_kzg",
@@ -4880,7 +4944,7 @@ dependencies = [
"eth2_network_config",
"eth2_wallet",
"ethereum_hashing",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"execution_layer",
"fixed_bytes",
"hex",
@@ -5417,8 +5481,8 @@ dependencies = [
"discv5",
"either",
"eth2",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"fixed_bytes",
"fnv",
"futures",
@@ -5761,8 +5825,8 @@ dependencies = [
"context_deserialize",
"educe",
"ethereum_hashing",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"itertools 0.13.0",
"parking_lot",
"rayon",
@@ -6062,7 +6126,7 @@ dependencies = [
"educe",
"eth2",
"eth2_network_config",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"execution_layer",
"fixed_bytes",
"fnv",
@@ -6457,8 +6521,8 @@ dependencies = [
"bitvec",
"bls",
"educe",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"fixed_bytes",
"itertools 0.14.0",
"maplit",
@@ -7023,8 +7087,8 @@ dependencies = [
name = "proto_array"
version = "0.2.0"
dependencies = [
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"fixed_bytes",
"safe_arith",
"serde",
@@ -7450,6 +7514,21 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7"
[[package]]
name = "reth-era"
version = "1.9.3"
source = "git+https://github.com/paradigmxyz/reth.git?rev=62abfdaeb54e8a205a8ee085ddebd56047d93374#62abfdaeb54e8a205a8ee085ddebd56047d93374"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-primitives",
"alloy-rlp",
"ethereum_ssz 0.9.1",
"ethereum_ssz_derive 0.9.1",
"snap",
"thiserror 2.0.17",
]
[[package]]
name = "rfc6979"
version = "0.4.0"
@@ -8242,8 +8321,8 @@ dependencies = [
"bls",
"byteorder",
"educe",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"filesystem",
"fixed_bytes",
"flate2",
@@ -8407,7 +8486,7 @@ dependencies = [
"context_deserialize",
"educe",
"ethereum_serde_utils",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"itertools 0.14.0",
"serde",
"serde_derive",
@@ -8431,8 +8510,8 @@ dependencies = [
"bls",
"educe",
"ethereum_hashing",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"fixed_bytes",
"int_to_bytes",
"integer-sqrt",
@@ -8459,7 +8538,7 @@ version = "0.1.0"
dependencies = [
"beacon_chain",
"bls",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"fixed_bytes",
"state_processing",
"tokio",
@@ -8481,8 +8560,8 @@ dependencies = [
"criterion",
"db-key",
"directory",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"fixed_bytes",
"itertools 0.14.0",
"leveldb",
@@ -9285,7 +9364,7 @@ checksum = "f7fd51aa83d2eb83b04570808430808b5d24fdbf479a4d5ac5dee4a2e2dd2be4"
dependencies = [
"alloy-primitives",
"ethereum_hashing",
"ethereum_ssz",
"ethereum_ssz 0.10.1",
"smallvec",
"typenum",
]
@@ -9350,8 +9429,8 @@ dependencies = [
"eth2_interop_keypairs",
"ethereum_hashing",
"ethereum_serde_utils",
"ethereum_ssz",
"ethereum_ssz_derive",
"ethereum_ssz 0.10.1",
"ethereum_ssz_derive 0.10.1",
"fixed_bytes",
"hex",
"int_to_bytes",

View File

@@ -207,6 +207,7 @@ rand = "0.9.0"
rayon = "1.7"
regex = "1"
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "stream", "rustls-tls"] }
reth-era = { git = "https://github.com/paradigmxyz/reth.git", rev = "62abfdaeb54e8a205a8ee085ddebd56047d93374" }
ring = "0.17"
rpds = "0.11"
rusqlite = { version = "0.38", features = ["bundled"] }

View File

@@ -50,6 +50,7 @@ parking_lot = { workspace = true }
proto_array = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
reth-era = { workspace = true }
safe_arith = { workspace = true }
sensitive_url = { workspace = true }
serde = { workspace = true }
@@ -79,6 +80,7 @@ maplit = { workspace = true }
mockall = { workspace = true }
mockall_double = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
[[bench]]
name = "benches"

View File

@@ -0,0 +1,507 @@
//! Import ERA files into a Lighthouse database.
//!
//! `EraFileDir` reads a directory of ERA files and imports them sequentially into the cold DB.
//! Each ERA file is verified against `historical_roots` / `historical_summaries` from the
//! highest-numbered ERA state, ensuring block and state integrity without trusting the files.
//!
//! Block roots are cross-checked against the ERA boundary state's `block_roots` vector.
//! Block signatures are NOT verified — ERA files are trusted at that level.
use bls::FixedBytesExtended;
use rayon::prelude::*;
use reth_era::common::file_ops::StreamReader;
use reth_era::era::file::EraReader;
use reth_era::era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock};
use std::fs::{self, File};
use std::path::{Path, PathBuf};
use store::{DBColumn, HotColdDB, ItemStore, KeyValueStoreOp};
use tracing::{debug, debug_span, info, instrument, warn};
use tree_hash::TreeHash;
use types::{
BeaconState, ChainSpec, EthSpec, Hash256, HistoricalBatch, HistoricalSummary,
SignedBeaconBlock, Slot,
};
fn decode_block<E: EthSpec>(
compressed: CompressedSignedBeaconBlock,
spec: &ChainSpec,
) -> Result<SignedBeaconBlock<E>, String> {
let bytes = compressed
.decompress()
.map_err(|error| format!("failed to decompress block: {error:?}"))?;
SignedBeaconBlock::from_ssz_bytes(&bytes, spec)
.map_err(|error| format!("failed to decode block: {error:?}"))
}
fn decode_state<E: EthSpec>(
compressed: CompressedBeaconState,
spec: &ChainSpec,
) -> Result<BeaconState<E>, String> {
let bytes = compressed
.decompress()
.map_err(|error| format!("failed to decompress state: {error:?}"))?;
BeaconState::from_ssz_bytes(&bytes, spec)
.map_err(|error| format!("failed to decode state: {error:?}"))
}
pub struct EraFileDir {
dir: PathBuf,
network_name: String,
genesis_validators_root: Hash256,
historical_roots: Vec<Hash256>,
historical_summaries: Vec<HistoricalSummary>,
max_era: u64,
}
impl EraFileDir {
pub fn new<E: EthSpec>(era_files_dir: &Path, spec: &ChainSpec) -> Result<Self, String> {
let mut era_files = list_era_files(era_files_dir)?;
era_files.sort_by_key(|(era_number, _)| *era_number);
let network_name = spec
.config_name
.clone()
.unwrap_or_else(|| "unknown".to_string());
let Some((max_era, reference_path)) = era_files.last().cloned() else {
return Err("era files directory is empty".to_string());
};
let reference_state = read_era_state::<E>(&reference_path, &network_name, spec)?;
// historical_roots was frozen in capella, and continued as historical_summaries
let historical_roots = reference_state.historical_roots().to_vec();
// Pre-Capella states don't have historical_summaries property
let historical_summaries = match reference_state.historical_summaries() {
Ok(list) => list.to_vec(),
Err(_) => vec![],
};
let dir = era_files_dir.to_path_buf();
let era_dir = Self {
dir,
network_name,
genesis_validators_root: reference_state.genesis_validators_root(),
historical_roots,
historical_summaries,
max_era,
};
// Verify that every expected era file name exists in the directory.
for era_number in 0..=era_dir.max_era {
let expected = era_dir.expected_path(era_number);
if !expected.exists() {
return Err(format!("missing era file: {expected:?}"));
}
}
Ok(era_dir)
}
pub fn max_era(&self) -> u64 {
self.max_era
}
pub fn genesis_validators_root(&self) -> Hash256 {
self.genesis_validators_root
}
/// Import all ERA files into a fresh store, verifying genesis and importing ERAs 1..=max_era.
pub fn import_all<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
&self,
store: &HotColdDB<E, Hot, Cold>,
genesis_state: &mut BeaconState<E>,
spec: &ChainSpec,
) -> Result<(), String> {
if self.genesis_validators_root != genesis_state.genesis_validators_root() {
return Err(format!(
"ERA files genesis_validators_root ({:?}) does not match network genesis ({:?}). \
Are the ERA files from the correct network?",
self.genesis_validators_root,
genesis_state.genesis_validators_root(),
));
}
let genesis_root = genesis_state
.canonical_root()
.map_err(|e| format!("Failed to hash genesis state: {e:?}"))?;
store
.put_cold_state(&genesis_root, genesis_state)
.map_err(|e| format!("Failed to store genesis state: {e:?}"))?;
let start = std::time::Instant::now();
for era_number in 1..=self.max_era {
self.import_era_file(store, era_number, spec, None)?;
if era_number % 100 == 0 || era_number == self.max_era {
let elapsed = start.elapsed();
let rate = era_number as f64 / elapsed.as_secs_f64();
info!(
era_number,
max_era = self.max_era,
?elapsed,
rate = format!("{rate:.1} era/s"),
"Progress"
);
}
}
info!(
max_era = self.max_era,
elapsed = ?start.elapsed(),
"ERA file import complete"
);
Ok(())
}
#[instrument(level = "debug", skip_all, fields(era_number = %era_number))]
pub fn import_era_file<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
&self,
store: &HotColdDB<E, Hot, Cold>,
era_number: u64,
spec: &ChainSpec,
trusted_state: Option<(Hash256, Slot)>,
) -> Result<(), String> {
let path = self.expected_path(era_number);
debug!(?path, era_number, "Importing era file");
let file = File::open(path).map_err(|error| format!("failed to open era file: {error}"))?;
let era_file = {
let _span = debug_span!("era_import_read").entered();
EraReader::new(file)
.read_and_assemble(self.network_name.clone())
.map_err(|error| format!("failed to parse era file: {error:?}"))?
};
// Consistency checks: ensure the era state matches the expected historical root and that
// each block root matches the state block_roots for its slot.
let mut state = {
let _span = debug_span!("era_import_decode_state").entered();
decode_state::<E>(era_file.group.era_state, spec)?
};
// Verify trusted state root if provided
if let Some((expected_root, expected_slot)) = trusted_state {
if state.slot() != expected_slot {
return Err(format!(
"trusted slot mismatch: expected {expected_slot}, got {}",
state.slot()
));
}
let actual_root = state
.canonical_root()
.map_err(|e| format!("Failed to compute state root: {e:?}"))?;
if actual_root != expected_root {
return Err(format!(
"trusted state root mismatch at slot {expected_slot}: \
expected {expected_root:?}, got {actual_root:?}"
));
}
}
let expected_root = self
.era_file_name_root(era_number)
.ok_or_else(|| format!("missing historical root for era {era_number}"))?;
let actual_root = era_root_from_state(&state, era_number)?;
if expected_root != actual_root {
return Err(format!(
"era root mismatch for era {era_number}: expected {expected_root:?}, got {actual_root:?}"
));
}
let slots_per_historical_root = E::slots_per_historical_root() as u64;
let _start_slot = Slot::new(era_number.saturating_sub(1) * slots_per_historical_root);
let end_slot = Slot::new(era_number * slots_per_historical_root);
if state.slot() != end_slot {
return Err(format!(
"era state slot mismatch: expected {end_slot}, got {}",
state.slot()
));
}
let historical_summaries_active_at_slot = match store.spec.capella_fork_epoch {
// For mainnet case, Capella activates at 194048 epoch = 6209536 slot = 758 era number.
// The last epoch processing before capella transition adds a last entry to historical
// roots. So historical_roots.len() == 758 at the capella fork boundary. An ERA file
// includes the state AFTER advanced (or applying) the block at the final slot, so the
// state for ERA file 758 is of the Capella variant. However, historical summaries are
// still empty.
Some(epoch) => {
epoch.start_slot(E::slots_per_epoch())
+ Slot::new(E::slots_per_historical_root() as u64)
}
None => Slot::max_value(),
};
// Check that the block roots vector in this state match the historical summary in the last
// state. Asserts that the blocks are exactly the expected ones given a trusted final state
if era_number == 0 {
// Skip checking genesis state era file for now
} else if state.slot() >= historical_summaries_active_at_slot {
// Post-capella state, check against historical summaries
// ```py
// historical_summary = HistoricalSummary(
// block_summary_root=hash_tree_root(state.block_roots),
// state_summary_root=hash_tree_root(state.state_roots),
// )
// state.historical_summaries.append(historical_summary)
// ```
let index = era_number.saturating_sub(1) as usize;
// historical_summaries started to be appended after capella, so we need to offset
let summary_index = index
.checked_sub(self.historical_roots.len())
.ok_or_else(|| format!(
"Not enough historical roots era number {era_number} index {index} historical_roots len {}",
self.historical_roots.len()
))?;
let expected_root = self
.historical_summaries
.get(summary_index)
.ok_or_else(|| format!("missing historical summary for era {era_number}"))?
.block_summary_root();
let actual_root = state.block_roots().tree_hash_root();
if actual_root != expected_root {
return Err(format!(
"block summary root post-capella mismatch for era {}: {:?} != {:?}",
era_number, expected_root, actual_root
));
}
} else {
// Pre-capella state, check against historical roots
// ```py
// historical_batch = HistoricalBatch(block_roots=state.block_roots, state_roots=state.state_roots)
// state.historical_roots.append(hash_tree_root(historical_batch))
// ```
let index = era_number.saturating_sub(1) as usize;
let expected_root = *self
.historical_roots
.get(index)
.ok_or_else(|| format!("missing historical root for era {era_number}"))?;
let historical_batch = HistoricalBatch::<E> {
block_roots: state.block_roots().clone(),
state_roots: state.state_roots().clone(),
};
let actual_root = historical_batch.tree_hash_root();
if actual_root != expected_root {
return Err(format!(
"block summary root pre-capella mismatch for era {}: {:?} != {:?}",
era_number, expected_root, actual_root
));
}
}
debug!(era_number, "Importing blocks from era file");
// TODO(era): Block signatures are not verified here and are trusted.
// decode and hash is split in two loops to track timings better. If we add spans for each
// block it's too short and the data is not really useful.
let decoded_blocks = {
let _span = debug_span!("era_import_decode_blocks").entered();
era_file
.group
.blocks
.into_par_iter()
.map(|compressed_block| decode_block::<E>(compressed_block, spec))
.collect::<Result<Vec<_>, _>>()?
};
let blocks_with_roots = {
let _span = debug_span!("era_import_hash_blocks").entered();
decoded_blocks
.into_par_iter()
.map(|block| (block.canonical_root(), block))
.collect::<Vec<_>>()
};
let mut block_ops = vec![];
{
let _ = debug_span!("era_import_db_ops_blocks").entered();
for (block_root, block) in blocks_with_roots {
let slot = block.slot();
// Check consistency that this block is expected w.r.t. the state in the era file.
// Since we check that the state block roots match the historical summary, we know that
// this block root is the expected one.
let expected_block_root = state
.get_block_root(slot)
.map_err(|error| format!("failed to read block root {slot}: {error:?}"))?;
if *expected_block_root != block_root {
return Err(format!(
"block root mismatch at slot {slot}: expected {expected_block_root:?}, got {block_root:?}"
));
}
store
.block_as_kv_store_ops(&block_root, block, &mut block_ops)
.map_err(|error| format!("failed to store block: {error:?}"))?;
}
}
{
let _ = debug_span!("era_import_write_blocks").entered();
store
.hot_db
.do_atomically(block_ops)
.map_err(|error| format!("failed to store blocks: {error:?}"))?;
}
// Populate the cold DB slot -> block root index from the state.block_roots()
{
let _span = debug_span!("era_import_write_block_index").entered();
write_block_root_index_for_era(store, &state, era_number)?;
}
debug!(era_number, "Importing state from era file");
{
let _span = debug_span!("era_import_write_state").entered();
let state_root = state
.canonical_root()
.map_err(|error| format!("failed to hash state: {error:?}"))?;
// Use put_cold_state as the split is not updated and we need the state into the cold store.
store
.put_cold_state(&state_root, &state)
.map_err(|error| format!("failed to store state: {error:?}"))?;
}
Ok(())
}
fn expected_path(&self, era_number: u64) -> PathBuf {
let root = self
.era_file_name_root(era_number)
.unwrap_or_else(Hash256::zero);
let short = root
.as_slice()
.iter()
.take(4)
.map(|byte| format!("{byte:02x}"))
.collect::<String>();
let filename = format!("{}-{era_number:05}-{short}.era", self.network_name);
self.dir.join(filename)
}
// era_file_name_root for file naming:
// short-era-root is the first 4 bytes of the last historical root in the last state in the
// era file, lower-case hex-encoded (8 characters), except the genesis era which instead
// uses the genesis_validators_root field from the genesis state.
// - The root is available as state.historical_roots[era - 1] except for genesis, which is
// state.genesis_validators_root
// - Post-Capella, the root must be computed from
// `state.historical_summaries[era - state.historical_roots.len - 1]`
fn era_file_name_root(&self, era_number: u64) -> Option<Hash256> {
if era_number == 0 {
return Some(self.genesis_validators_root);
}
let index = era_number.saturating_sub(1) as usize;
if let Some(root) = self.historical_roots.get(index) {
return Some(*root);
}
let summary_index = index.saturating_sub(self.historical_roots.len());
self.historical_summaries
.get(summary_index)
.map(|summary| summary.tree_hash_root())
}
}
fn read_era_state<E: EthSpec>(
path: &Path,
network_name: &str,
spec: &ChainSpec,
) -> Result<BeaconState<E>, String> {
let file = File::open(path).map_err(|error| format!("failed to open era file: {error}"))?;
let era_file = EraReader::new(file)
.read_and_assemble(network_name.to_string())
.map_err(|error| format!("failed to parse era file: {error:?}"))?;
decode_state::<E>(era_file.group.era_state, spec)
}
fn era_root_from_state<E: EthSpec>(
state: &BeaconState<E>,
era_number: u64,
) -> Result<Hash256, String> {
if era_number == 0 {
return Ok(state.genesis_validators_root());
}
let index = era_number
.checked_sub(1)
.ok_or_else(|| "invalid era number".to_string())? as usize;
if let Some(root) = state.historical_roots().get(index) {
return Ok(*root);
}
if let Ok(summaries) = state.historical_summaries() {
let summary_index = index.saturating_sub(state.historical_roots().len());
let summary = summaries
.get(summary_index)
.ok_or_else(|| "missing historical summary".to_string())?;
return Ok(summary.tree_hash_root());
}
Err(format!("missing historical root for era {era_number}"))
}
fn write_block_root_index_for_era<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
state: &BeaconState<E>,
era_number: u64,
) -> Result<(), String> {
let end_slot = state.slot();
let slots_per_historical_root = E::slots_per_historical_root() as u64;
let expected_end_slot = Slot::new(era_number * slots_per_historical_root);
if end_slot != expected_end_slot {
return Err(format!(
"era state slot mismatch: expected {expected_end_slot}, got {end_slot}"
));
}
let start_slot = end_slot.saturating_sub(slots_per_historical_root);
let ops = (start_slot.as_u64()..end_slot.as_u64())
.map(|slot_u64| {
let slot = Slot::new(slot_u64);
let block_root = state
.get_block_root(slot)
.map_err(|error| format!("failed to read block root {slot}: {error:?}"))?;
// TODO(era): Should we write BeaconBlockRoots for missed slots?
Ok(KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconBlockRoots,
slot_u64.to_be_bytes().to_vec(),
block_root.as_slice().to_vec(),
))
})
.collect::<Result<Vec<_>, String>>()?;
store
.cold_db
.do_atomically(ops)
.map_err(|error| format!("failed to store block root index: {error:?}"))?;
Ok(())
}
fn list_era_files(dir: &Path) -> Result<Vec<(u64, PathBuf)>, String> {
let entries = fs::read_dir(dir).map_err(|error| format!("failed to read era dir: {error}"))?;
let mut era_files = Vec::new();
for entry in entries {
let entry = entry.map_err(|error| format!("failed to read era entry: {error}"))?;
let path = entry.path();
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
if !file_name.ends_with(".era") {
continue;
}
let Some((prefix, _hash_part)) = file_name.rsplit_once('-') else {
continue;
};
let Some((_network_name, era_part)) = prefix.rsplit_once('-') else {
continue;
};
let Some(era_number) = era_part.parse().ok() else {
continue;
};
era_files.push((era_number, path));
}
if era_files.is_empty() {
warn!(?dir, "Era files directory is empty");
}
Ok(era_files)
}

View File

@@ -0,0 +1,18 @@
/// ERA file support for importing and exporting historical beacon chain data.
///
/// ERA files store beacon blocks and states in a standardized archive format, enabling
/// efficient distribution of historical chain data between clients. Each ERA file covers
/// one "era" of `SLOTS_PER_HISTORICAL_ROOT` slots (8192 on mainnet) and contains:
/// - All beacon blocks in the slot range
/// - The boundary `BeaconState` at the end of the range
///
/// Verification relies on `historical_roots` (pre-Capella) and `historical_summaries`
/// (post-Capella) which commit to the block and state roots for each era.
///
/// Spec: <https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#historical-roots-updates>
/// Format: <https://github.com/status-im/nimbus-eth2/blob/stable/docs/the_auditors_handbook/src/02.4_the_era_file_format.md>
pub mod consumer;
pub mod producer;
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,332 @@
//! Export ERA files from a Lighthouse database.
//!
//! Reads blocks and states from the cold DB and writes them into the standardized ERA format.
//! ERA files are produced either during historical reconstruction (backfilling) or upon
//! finalization of new eras. Each file is named `{network}-{era_number}-{short_root}.era`.
//!
//! Files are written atomically (temp file + rename) so partial writes never appear as valid
//! ERA files. Existing files are skipped, making production idempotent.
use rand::random;
use reth_era::common::file_ops::{EraFileFormat, EraFileId, StreamWriter};
use reth_era::era::file::{EraFile, EraWriter};
use reth_era::era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock};
use reth_era::era::types::group::{EraGroup, EraId, SlotIndex};
use ssz::Encode;
use std::fs::{self, File, OpenOptions};
use std::path::Path;
use store::{HotColdDB, ItemStore};
use tracing::{error, info};
use tree_hash::TreeHash;
use types::{BeaconState, EthSpec, Slot};
fn era_file_exists(dir: &Path, id: &EraId) -> bool {
dir.join(id.to_file_name()).exists()
}
fn era_file_exists_for_number(dir: &Path, network_name: &str, era_number: u64) -> bool {
let prefix = format!("{}-{:05}-", network_name, era_number);
let Ok(entries) = fs::read_dir(dir) else {
return false;
};
for entry in entries.flatten() {
let file_name = entry.file_name();
let Some(name) = file_name.to_str() else {
continue;
};
if name.starts_with(&prefix) && name.ends_with(".era") {
return true;
}
}
false
}
#[allow(dead_code)]
pub(crate) fn maybe_produce_reconstruction_eras<
E: EthSpec,
Hot: ItemStore<E>,
Cold: ItemStore<E>,
>(
db: &HotColdDB<E, Hot, Cold>,
output_dir: &Path,
) {
let anchor = db.get_anchor_info();
let max_era = anchor.state_lower_limit.as_u64() / E::slots_per_historical_root() as u64;
for era_number in 0..=max_era {
if let Err(error) = create_era_file(db, era_number, output_dir) {
error!(
?error,
era_number, "Era producer failed during reconstruction"
);
break;
}
}
}
#[allow(dead_code)]
pub(crate) fn maybe_produce_finalization_era<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
db: &HotColdDB<E, Hot, Cold>,
output_dir: &Path,
finalized_slot: Slot,
) {
// This is the oldest slot for which we have a state and blocks available
let anchor_slot = db.get_anchor_info().anchor_slot;
// And finalized_slot is the most recent for which we have finalized state and blocks available
// We can produce an era file for era_number if
// - anchor_slot <= start_slot(era_number) AND
// - finalized_slot >= end_slot(era_number)
let slots_per_hr = E::slots_per_historical_root() as u64;
let lowest_era_file = anchor_slot.as_u64() / slots_per_hr;
let Some(max_era_file) = ((finalized_slot.as_u64() + 1) / slots_per_hr).checked_sub(1) else {
return;
};
for era_number in lowest_era_file..=max_era_file {
if let Err(error) = create_era_file(db, era_number, output_dir) {
error!(
?error,
era_number, "Era producer failed during finalization"
);
break;
}
}
}
pub fn create_era_file<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
db: &HotColdDB<E, Hot, Cold>,
era_number: u64,
output_dir: &Path,
) -> Result<(), String> {
let network_name = db
.spec
.config_name
.clone()
.unwrap_or_else(|| "unknown".to_string());
if era_file_exists_for_number(output_dir, &network_name, era_number) {
return Ok(());
}
let end_slot = Slot::new(era_number * E::slots_per_historical_root() as u64);
let mut state = db
.load_cold_state_by_slot(end_slot)
.map_err(|error| format!("failed to load era state: {error:?}"))?;
if state.slot() != end_slot {
return Err(format!(
"era state slot mismatch: expected {}, got {}",
end_slot,
state.slot()
));
}
let group = build_era_group(db, &mut state, era_number)?;
let file_id = era_file_id::<E>(&network_name, era_number, &mut state)?;
let file = EraFile::new(group, file_id);
fs::create_dir_all(output_dir)
.map_err(|error| format!("failed to create era files dir: {error}"))?;
if era_file_exists(output_dir, file.id()) {
return Ok(());
}
write_era_file_atomic(output_dir, &file)?;
info!(
era_number,
file = %file.id().to_file_name(),
"Wrote era file"
);
Ok(())
}
fn build_era_group<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
db: &HotColdDB<E, Hot, Cold>,
state: &mut BeaconState<E>,
era_number: u64,
) -> Result<EraGroup, String> {
// Era file 0 goes from slot 0 to 0, genesis state only
let start_slot =
Slot::new(era_number.saturating_sub(1) * E::slots_per_historical_root() as u64);
let end_slot = Slot::new(era_number * E::slots_per_historical_root() as u64);
let compressed_state = CompressedBeaconState::from_ssz(&state.as_ssz_bytes())
.map_err(|error| format!("failed to compress state: {error:?}"))?;
// Each entry has an 8-byte header; the version record is header-only.
let mut offset: i64 = 8;
let mut blocks: Vec<CompressedSignedBeaconBlock> = Vec::new();
let mut block_data_starts: Vec<(Slot, i64)> = Vec::new();
// The era file number 0 contains the genesis state and nothing else
if era_number > 0 {
for slot_u64 in start_slot.as_u64()..end_slot.as_u64() {
let slot = Slot::new(slot_u64);
let block_root = state
.get_block_root(slot)
.map_err(|error| format!("failed to read block root {slot}: {error:?}"))?;
// Skip duplicate blocks (same root as previous slot), but only within this ERA
if slot_u64 > start_slot.as_u64()
&& let Ok(prev_root) = state.get_block_root(Slot::new(slot_u64 - 1))
&& prev_root == block_root
{
continue;
}
let block = db
.get_full_block(block_root)
.map_err(|error| format!("failed to load block: {error:?}"))?
.ok_or_else(|| format!("missing block for root {block_root:?}"))?;
let compressed = CompressedSignedBeaconBlock::from_ssz(&block.as_ssz_bytes())
.map_err(|error| format!("failed to compress block: {error:?}"))?;
let data_len = compressed.data.len() as i64;
let data_start = offset + 8;
blocks.push(compressed);
block_data_starts.push((slot, data_start));
offset += 8 + data_len;
}
}
let state_data_len = compressed_state.data.len() as i64;
// Data starts after the 8-byte header.
let state_data_start = offset + 8;
offset += 8 + state_data_len;
let block_index_start = offset;
let slot_count = E::slots_per_historical_root();
// SlotIndex layout: starting_slot (8) + offsets (slot_count * 8) + count (8).
let block_index_len = 8 + slot_count as i64 * 8 + 8;
if era_number > 0 {
offset += 8 + block_index_len;
}
let state_index_start = offset;
// Offset is relative to the start of the slot index data (after the 8-byte header)
let state_offset = state_data_start - (state_index_start + 8);
let state_slot_index = SlotIndex::new(end_slot.as_u64(), vec![state_offset]);
let group = if era_number > 0 {
let mut offsets = vec![0i64; slot_count];
for (slot, data_start) in &block_data_starts {
let slot_index = slot
.as_u64()
.checked_sub(start_slot.as_u64())
.ok_or_else(|| "slot underflow while building block index".to_string())?
as usize;
offsets[slot_index] = *data_start - block_index_start;
}
let block_index = SlotIndex::new(start_slot.as_u64(), offsets);
EraGroup::with_block_index(blocks, compressed_state, block_index, state_slot_index)
} else {
EraGroup::new(blocks, compressed_state, state_slot_index)
};
Ok(group)
}
fn short_historical_root<E: EthSpec>(
state: &mut BeaconState<E>,
era_number: u64,
) -> Result<[u8; 4], String> {
let root = if era_number == 0 {
state.genesis_validators_root()
} else {
let era_index = era_number
.checked_sub(1)
.ok_or_else(|| "era index underflow".to_string())?;
let roots_len = state.historical_roots_mut().len();
if era_index < roots_len as u64 {
*state
.historical_roots_mut()
.get(era_index as usize)
.ok_or_else(|| "historical root missing".to_string())?
} else {
let summary_index = era_index
.checked_sub(roots_len as u64)
.ok_or_else(|| "historical summary index underflow".to_string())?;
let summaries = state
.historical_summaries_mut()
.map_err(|error| format!("failed to access historical summaries: {error:?}"))?;
let summary = summaries
.get(summary_index as usize)
.ok_or_else(|| "historical summary missing".to_string())?;
summary.tree_hash_root()
}
};
let mut short_hash = [0u8; 4];
short_hash.copy_from_slice(&root.as_slice()[..4]);
Ok(short_hash)
}
/// Write an era file atomically using a temp file + rename.
///
/// If the process crashes mid-write, only the temp file is left behind; the final file is
/// created via rename, so it is either complete or absent. The era file existence check only
/// considers `.era` files, so partial temp files are ignored safely.
fn write_era_file_atomic(output_dir: &Path, file: &EraFile) -> Result<(), String> {
let filename = file.id().to_file_name();
let final_path = output_dir.join(&filename);
if final_path.exists() {
return Ok(());
}
// Create a unique temp file and write the full era contents into it.
let tmp_name = format!("{filename}.tmp-{:016x}", random::<u64>());
let tmp_path = output_dir.join(tmp_name);
let mut file_handle = OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
.map_err(|error| format!("failed to create temp file: {error}"))?;
{
let mut writer = EraWriter::new(&mut file_handle);
writer
.write_file(file)
.map_err(|error| format!("failed to write era file: {error:?}"))?;
}
file_handle
.sync_all()
.map_err(|error| format!("failed to fsync era temp file: {error}"))?;
// Atomically publish; if another writer won, clean up and exit.
if let Err(error) = fs::rename(&tmp_path, &final_path) {
if error.kind() == std::io::ErrorKind::AlreadyExists && final_path.exists() {
let _ = fs::remove_file(&tmp_path);
return Ok(());
}
return Err(format!("failed to rename era temp file: {error}"));
}
// Best-effort directory sync to make the rename durable.
if let Ok(dir_handle) = File::open(output_dir) {
let _ = dir_handle.sync_all();
}
Ok(())
}
fn era_file_id<E: EthSpec>(
network_name: &str,
era_number: u64,
state: &mut BeaconState<E>,
) -> Result<EraId, String> {
// reth_era uses hardcoded SLOTS_PER_HISTORICAL_ROOT=8192 to compute era number from start_slot.
// To get the correct filename era number, we pass era_number * 8192 as the start_slot.
const RETH_SLOTS_PER_HISTORICAL_ROOT: u64 = 8192;
let reth_start_slot = era_number * RETH_SLOTS_PER_HISTORICAL_ROOT;
let slot_count = if era_number == 0 {
0
} else {
E::slots_per_historical_root() as u32
};
let short_hash = short_historical_root(state, era_number)?;
Ok(EraId::new(network_name, reth_start_slot, slot_count).with_hash(short_hash))
}

View File

@@ -0,0 +1,388 @@
/// ERA file consumer + producer tests using minimal preset test vectors.
///
/// Test vectors: 13 ERA files from a Nimbus minimal testnet.
/// - Electra from genesis, Fulu at epoch 100000
/// - SLOTS_PER_HISTORICAL_ROOT = 64 (one ERA = 64 slots = 8 epochs)
/// - 13 ERA files covering 832 slots, 767 blocks, 1024 validators
///
/// All subtests run from a single #[test] to avoid nextest download races
/// (same pattern as slashing_protection/tests/interop.rs).
use super::consumer::EraFileDir;
use reth_era::common::file_ops::StreamReader;
use serde::Deserialize;
use std::path::PathBuf;
use std::sync::{Arc, LazyLock};
use store::{DBColumn, HotColdDB, KeyValueStore, StoreConfig};
use types::{BeaconState, ChainSpec, Config, EthSpec, Hash256, MinimalEthSpec, Slot};
#[derive(Deserialize)]
struct Metadata {
head_slot: u64,
head_root: String,
era_count: u64,
}
static TEST_VECTORS_DIR: LazyLock<PathBuf> = LazyLock::new(|| {
let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("era_test_vectors");
let make_output = std::process::Command::new("make")
.current_dir(&dir)
.output()
.expect("need `make` to download ERA test vectors");
if !make_output.status.success() {
eprintln!("{}", String::from_utf8_lossy(&make_output.stderr));
panic!("Running `make` for ERA test vectors failed, see above");
}
dir.join("vectors")
});
fn test_vectors_dir() -> &'static PathBuf {
&TEST_VECTORS_DIR
}
fn load_test_spec() -> ChainSpec {
let config_str =
std::fs::read_to_string(test_vectors_dir().join("config.yaml")).expect("read config.yaml");
let config: Config = serde_yaml::from_str(&config_str).expect("parse config");
config
.apply_to_chain_spec::<MinimalEthSpec>(&ChainSpec::minimal())
.expect("apply config")
}
fn load_genesis_state(spec: &ChainSpec) -> BeaconState<MinimalEthSpec> {
let era_dir = test_vectors_dir().join("era");
let era0_path = std::fs::read_dir(&era_dir)
.expect("read era dir")
.filter_map(|e| e.ok())
.find(|e| e.file_name().to_string_lossy().contains("-00000-"))
.expect("ERA 0 file must exist");
let file = std::fs::File::open(era0_path.path()).expect("open ERA 0");
let era = reth_era::era::file::EraReader::new(file)
.read_and_assemble("minimal".to_string())
.expect("parse ERA 0");
let state_bytes = era
.group
.era_state
.decompress()
.expect("decompress ERA 0 state");
BeaconState::from_ssz_bytes(&state_bytes, spec).expect("decode genesis state from ERA 0")
}
type TestStore = HotColdDB<
MinimalEthSpec,
store::MemoryStore<MinimalEthSpec>,
store::MemoryStore<MinimalEthSpec>,
>;
fn import_all_era_files() -> (TestStore, ChainSpec, u64) {
let spec = load_test_spec();
let era_dir_path = test_vectors_dir().join("era");
let era_dir = EraFileDir::new::<MinimalEthSpec>(&era_dir_path, &spec).expect("open ERA dir");
let max_era = era_dir.max_era();
let store = HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone()))
.expect("create store");
let mut genesis_state = load_genesis_state(&spec);
era_dir
.import_all(&store, &mut genesis_state, &spec)
.expect("import all ERA files");
(store, spec, max_era)
}
fn empty_store(spec: &ChainSpec) -> TestStore {
let store =
HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())).expect("store");
let mut genesis = load_genesis_state(spec);
let root = genesis.canonical_root().expect("hash");
let mut ops = vec![];
store
.store_cold_state(&root, &genesis, &mut ops)
.expect("ops");
store.cold_db.do_atomically(ops).expect("write");
store
}
fn era_dir_with_corrupt(corrupt_file: &str, target_pattern: &str) -> tempfile::TempDir {
let tmp = tempfile::TempDir::new().expect("tmp");
let src = test_vectors_dir();
let dst = tmp.path().join("era");
std::fs::create_dir_all(&dst).expect("mkdir");
for entry in std::fs::read_dir(src.join("era")).expect("readdir") {
let entry = entry.expect("entry");
let name = entry.file_name().to_string_lossy().to_string();
if name.contains(target_pattern) {
std::fs::copy(src.join("corrupt").join(corrupt_file), dst.join(&name))
.expect("copy corrupt");
} else {
std::fs::copy(entry.path(), dst.join(&name)).expect("copy");
}
}
tmp
}
fn assert_import_fails(
corrupt_file: &str,
target_pattern: &str,
target_era: u64,
expected_err: &str,
) {
let tmp = era_dir_with_corrupt(corrupt_file, target_pattern);
let spec = load_test_spec();
let era_dir = EraFileDir::new::<MinimalEthSpec>(&tmp.path().join("era"), &spec)
.expect("init should succeed");
let store = empty_store(&spec);
for era in 0..target_era {
era_dir
.import_era_file(&store, era, &spec, None)
.unwrap_or_else(|e| panic!("ERA {era}: {e}"));
}
let err = era_dir
.import_era_file(&store, target_era, &spec, None)
.unwrap_err();
assert!(
err.contains(expected_err),
"expected \"{expected_err}\", got: {err}"
);
}
fn era3_correct_root_and_slot(spec: &ChainSpec) -> (Hash256, types::Slot) {
let era3_file = std::fs::read_dir(test_vectors_dir().join("era"))
.expect("readdir")
.filter_map(|e| e.ok())
.find(|e| e.file_name().to_string_lossy().contains("-00003-"))
.expect("ERA 3");
let file = std::fs::File::open(era3_file.path()).expect("open");
let era = reth_era::era::file::EraReader::new(file)
.read_and_assemble("minimal".to_string())
.expect("parse");
let state_bytes = era.group.era_state.decompress().expect("decompress");
let mut state: BeaconState<MinimalEthSpec> =
BeaconState::from_ssz_bytes(&state_bytes, spec).expect("decode");
let root = state.canonical_root().expect("root");
let slot = state.slot();
(root, slot)
}
// Single #[test] to avoid nextest parallel download races.
// See slashing_protection/tests/interop.rs for the same pattern.
#[test]
fn era_test_vectors() {
consumer_imports_and_verifies();
producer_output_is_byte_identical();
rejects_corrupted_block_decompression();
rejects_corrupted_genesis_state();
rejects_corrupted_middle_state();
rejects_corrupted_reference_state();
rejects_wrong_era_content();
rejects_wrong_era_root();
rejects_corrupt_block_summary();
rejects_wrong_block_root();
rejects_mutated_state_with_trusted_root();
rejects_wrong_trusted_state_root();
}
fn load_metadata() -> Metadata {
let path = test_vectors_dir().join("metadata.json");
let data = std::fs::read_to_string(&path).expect("read metadata.json");
serde_json::from_str(&data).expect("parse metadata.json")
}
fn consumer_imports_and_verifies() {
let metadata = load_metadata();
let (store, _spec, max_era) = import_all_era_files();
let slots_per_era = MinimalEthSpec::slots_per_historical_root() as u64;
assert_eq!(max_era + 1, metadata.era_count, "era count mismatch");
// The last indexed slot is (max_era * slots_per_era - 1), since the ERA boundary
// state covers [start_slot, end_slot) where end_slot = era_number * slots_per_era.
let last_slot = max_era * slots_per_era - 1;
// Verify head block root matches metadata
let head_key = metadata.head_slot.to_be_bytes().to_vec();
let head_root_bytes = store
.cold_db
.get_bytes(DBColumn::BeaconBlockRoots, &head_key)
.expect("read head root index")
.unwrap_or_else(|| panic!("no block root at head slot {}", metadata.head_slot));
let head_root = Hash256::from_slice(&head_root_bytes);
let expected_head_root_bytes = hex::decode(&metadata.head_root).expect("decode head_root hex");
let expected_head_root = Hash256::from_slice(&expected_head_root_bytes);
assert_eq!(
head_root, expected_head_root,
"head root mismatch at slot {}: got {head_root:?}",
metadata.head_slot
);
// Verify the head block exists and has the correct root
let head_block = store
.get_full_block(&head_root)
.expect("query head block")
.unwrap_or_else(|| panic!("head block missing at slot {}", metadata.head_slot));
assert_eq!(head_block.canonical_root(), head_root);
assert_eq!(
head_block.slot(),
Slot::new(metadata.head_slot),
"last indexed slot is {last_slot}"
);
}
fn producer_output_is_byte_identical() {
let (store, _spec, max_era) = import_all_era_files();
let output = PathBuf::from("/tmp/era_producer_test_output");
let _ = std::fs::remove_dir_all(&output);
std::fs::create_dir_all(&output).expect("mkdir");
for era in 0..=max_era {
super::producer::create_era_file(&store, era, &output)
.unwrap_or_else(|e| panic!("produce ERA {era}: {e}"));
}
let mut originals: Vec<_> = std::fs::read_dir(test_vectors_dir().join("era"))
.expect("readdir")
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().ends_with(".era"))
.collect();
originals.sort_by_key(|e| e.file_name());
let mut produced: Vec<_> = std::fs::read_dir(&output)
.expect("readdir")
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().ends_with(".era"))
.collect();
produced.sort_by_key(|e| e.file_name());
assert_eq!(originals.len(), produced.len(), "file count mismatch");
for (orig, prod) in originals.iter().zip(produced.iter()) {
assert_eq!(
std::fs::read(orig.path()).expect("read"),
std::fs::read(prod.path()).expect("read"),
"ERA mismatch: {:?}",
orig.file_name()
);
}
}
fn rejects_corrupted_block_decompression() {
assert_import_fails("era1-corrupt-block.era", "-00001-", 1, "decompress");
}
fn rejects_corrupted_genesis_state() {
assert_import_fails("era0-corrupt-state.era", "-00000-", 0, "decompress");
}
fn rejects_corrupted_middle_state() {
assert_import_fails("era5-corrupt-state.era", "-00005-", 5, "decompress");
}
fn rejects_corrupted_reference_state() {
let tmp = era_dir_with_corrupt("era12-corrupt-state.era", "-00012-");
let spec = load_test_spec();
match EraFileDir::new::<MinimalEthSpec>(&tmp.path().join("era"), &spec) {
Ok(_) => panic!("should fail with corrupted reference state"),
Err(err) => assert!(err.contains("decompress"), "expected decompress: {err}"),
}
}
fn rejects_wrong_era_content() {
assert_import_fails(
"era3-wrong-content.era",
"-00003-",
3,
"era state slot mismatch",
);
}
fn rejects_wrong_era_root() {
assert_import_fails("era0-wrong-root.era", "-00000-", 0, "era root mismatch");
}
fn rejects_corrupt_block_summary() {
assert_import_fails(
"era8-corrupt-block-summary.era",
"-00008-",
8,
"block summary root post-capella mismatch",
);
}
fn rejects_wrong_block_root() {
assert_import_fails(
"era2-wrong-block-root.era",
"-00002-",
2,
"block root mismatch",
);
}
fn rejects_mutated_state_with_trusted_root() {
let tmp = era_dir_with_corrupt("era3-wrong-state-root.era", "-00003-");
let spec = load_test_spec();
let era_dir = EraFileDir::new::<MinimalEthSpec>(&tmp.path().join("era"), &spec)
.expect("init should succeed");
let store = empty_store(&spec);
for era in 0..3 {
era_dir
.import_era_file(&store, era, &spec, None)
.unwrap_or_else(|e| panic!("ERA {era}: {e}"));
}
let (correct_root, slot) = era3_correct_root_and_slot(&spec);
let err = era_dir
.import_era_file(&store, 3, &spec, Some((correct_root, slot)))
.unwrap_err();
assert!(
err.contains("trusted state root mismatch"),
"expected trusted state root mismatch: {err}"
);
}
fn rejects_wrong_trusted_state_root() {
let spec = load_test_spec();
let store = empty_store(&spec);
let era_dir_path = test_vectors_dir().join("era");
let era_dir = EraFileDir::new::<MinimalEthSpec>(&era_dir_path, &spec).expect("open");
for era in 0..=2 {
era_dir
.import_era_file(&store, era, &spec, None)
.unwrap_or_else(|e| panic!("ERA {era}: {e}"));
}
let (correct_root, slot) = era3_correct_root_and_slot(&spec);
era_dir
.import_era_file(&store, 3, &spec, Some((correct_root, slot)))
.expect("correct root should pass");
let wrong_root = {
let mut bytes: [u8; 32] = correct_root.into();
bytes[0] ^= 0x01;
Hash256::from(bytes)
};
let store2 = empty_store(&spec);
for era in 0..=2 {
era_dir
.import_era_file(&store2, era, &spec, None)
.unwrap_or_else(|e| panic!("ERA {era}: {e}"));
}
let err = era_dir
.import_era_file(&store2, 3, &spec, Some((wrong_root, slot)))
.unwrap_err();
assert!(
err.contains("trusted state root mismatch"),
"expected trusted state root mismatch: {err}"
);
}

View File

@@ -21,6 +21,7 @@ pub mod custody_context;
pub mod data_availability_checker;
pub mod data_column_verification;
mod early_attester_cache;
pub mod era;
mod errors;
pub mod events;
pub mod execution_payload;

View File

@@ -0,0 +1,2 @@
vectors
*.tar.gz

View File

@@ -0,0 +1,17 @@
VECTORS_TAG := 52eb9dd94a09153b8b07c9bba4b08adca0d6e219
OUTPUT_DIR := vectors
TARBALL := $(OUTPUT_DIR)-$(VECTORS_TAG).tar.gz
ARCHIVE_URL := https://github.com/dapplion/era-test-vectors/tarball/$(VECTORS_TAG)
$(OUTPUT_DIR): $(TARBALL)
rm -rf $@
mkdir -p $@
tar --strip-components=1 -xzf $^ -C $@
$(TARBALL):
curl --fail -L -o $@ $(ARCHIVE_URL)
clean:
rm -rf $(OUTPUT_DIR) $(TARBALL)
.PHONY: clean

View File

@@ -2076,6 +2076,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}
/// Store a pre-finalization state in the freezer database, applying ops atomically.
pub fn put_cold_state(
&self,
state_root: &Hash256,
state: &BeaconState<E>,
) -> Result<(), Error> {
let mut ops: Vec<KeyValueStoreOp> = Vec::new();
self.store_cold_state(state_root, state, &mut ops)?;
self.cold_db.do_atomically(ops)
}
/// Store a pre-finalization state in the freezer database.
pub fn store_cold_state(
&self,

View File

@@ -47,4 +47,8 @@ impl HistoricalSummary {
state_summary_root: state.state_roots().tree_hash_root(),
}
}
pub fn block_summary_root(&self) -> Hash256 {
self.block_summary_root
}
}

View File

@@ -0,0 +1,79 @@
use beacon_chain::era::consumer::EraFileDir;
use clap::ArgMatches;
use clap_utils::parse_required;
use environment::Environment;
use eth2_network_config::Eth2NetworkConfig;
use std::path::PathBuf;
use std::time::Duration;
use store::database::interface::BeaconNodeBackend;
use store::{HotColdDB, StoreConfig};
use tracing::info;
use types::EthSpec;
fn is_dir_non_empty(path: &PathBuf) -> bool {
path.exists()
&& std::fs::read_dir(path)
.map(|mut entries| entries.next().is_some())
.unwrap_or(false)
}
pub fn run<E: EthSpec>(
env: Environment<E>,
network_config: Eth2NetworkConfig,
matches: &ArgMatches,
) -> Result<(), String> {
let datadir: PathBuf = parse_required(matches, "datadir")?;
let era_dir: PathBuf = parse_required(matches, "era-dir")?;
let hot_path = datadir.join("chain_db");
let cold_path = datadir.join("freezer_db");
let blobs_path = datadir.join("blobs_db");
// Fail fast if database directories already contain data
if is_dir_non_empty(&hot_path) || is_dir_non_empty(&cold_path) {
return Err(format!(
"Database directories are not empty: {} / {}. \
This command expects a fresh datadir.",
hot_path.display(),
cold_path.display(),
));
}
let spec = env.eth2_config.spec.clone();
info!(
hot_path = %hot_path.display(),
cold_path = %cold_path.display(),
era_dir = %era_dir.display(),
"Opening database"
);
std::fs::create_dir_all(&hot_path).map_err(|e| format!("Failed to create hot db dir: {e}"))?;
std::fs::create_dir_all(&cold_path)
.map_err(|e| format!("Failed to create cold db dir: {e}"))?;
std::fs::create_dir_all(&blobs_path)
.map_err(|e| format!("Failed to create blobs db dir: {e}"))?;
let db = HotColdDB::<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>::open(
&hot_path,
&cold_path,
&blobs_path,
|_, _, _| Ok(()),
StoreConfig::default(),
spec.clone(),
)
.map_err(|e| format!("Failed to open database: {e:?}"))?;
let mut genesis_state = env
.runtime()
.block_on(network_config.genesis_state::<E>(None, Duration::from_secs(120)))
.map_err(|e| format!("Failed to load genesis state: {e}"))?
.ok_or("No genesis state available for this network")?;
let era_file_dir = EraFileDir::new::<E>(&era_dir, &spec)
.map_err(|e| format!("Failed to open ERA dir: {e}"))?;
era_file_dir.import_all(&db, &mut genesis_state, &spec)?;
Ok(())
}

View File

@@ -1,11 +1,13 @@
mod block_root;
mod check_deposit_data;
mod consume_era_files;
mod generate_bootnode_enr;
mod http_sync;
mod indexed_attestations;
mod mnemonic_validators;
mod mock_el;
mod parse_ssz;
mod produce_era_files;
mod skip_slots;
mod state_root;
mod transition_blocks;
@@ -571,6 +573,50 @@ fn main() {
.display_order(0)
)
)
.subcommand(
Command::new("consume-era-files")
.about("Import ERA files into an empty database, producing a ready-to-use beacon node DB.")
.arg(
Arg::new("datadir")
.long("datadir")
.value_name("PATH")
.action(ArgAction::Set)
.required(true)
.help("Path to the beacon node data directory (will create chain_db, freezer_db, blobs_db inside).")
.display_order(0)
)
.arg(
Arg::new("era-dir")
.long("era-dir")
.value_name("PATH")
.action(ArgAction::Set)
.required(true)
.help("Directory containing ERA files to import.")
.display_order(0)
)
)
.subcommand(
Command::new("produce-era-files")
.about("Produce ERA files from a fully reconstructed beacon node database.")
.arg(
Arg::new("datadir")
.long("datadir")
.value_name("PATH")
.action(ArgAction::Set)
.required(true)
.help("Path to the beacon node data directory (containing chain_db, freezer_db, blobs_db).")
.display_order(0)
)
.arg(
Arg::new("output-dir")
.long("output-dir")
.value_name("PATH")
.action(ArgAction::Set)
.required(true)
.help("Directory to write ERA files to. Created if it does not exist.")
.display_order(0)
)
)
.subcommand(
Command::new("http-sync")
.about("Manual sync")
@@ -765,6 +811,13 @@ fn run<E: EthSpec>(env_builder: EnvironmentBuilder<E>, matches: &ArgMatches) ->
}
Some(("mock-el", matches)) => mock_el::run::<E>(env, matches)
.map_err(|e| format!("Failed to run mock-el command: {}", e)),
Some(("consume-era-files", matches)) => {
let network_config = get_network_config()?;
consume_era_files::run::<E>(env, network_config, matches)
.map_err(|e| format!("Failed to consume ERA files: {}", e))
}
Some(("produce-era-files", matches)) => produce_era_files::run::<E>(env, matches)
.map_err(|e| format!("Failed to produce ERA files: {}", e)),
Some(("http-sync", matches)) => {
let network_config = get_network_config()?;
http_sync::run::<E>(env, network_config, matches)

View File

@@ -0,0 +1,90 @@
use beacon_chain::era::producer;
use clap::ArgMatches;
use clap_utils::parse_required;
use environment::Environment;
use std::path::PathBuf;
use store::database::interface::BeaconNodeBackend;
use store::{HotColdDB, StoreConfig};
use tracing::info;
use types::EthSpec;
pub fn run<E: EthSpec>(env: Environment<E>, matches: &ArgMatches) -> Result<(), String> {
let datadir: PathBuf = parse_required(matches, "datadir")?;
let output_dir: PathBuf = parse_required(matches, "output-dir")?;
let hot_path = datadir.join("chain_db");
let cold_path = datadir.join("freezer_db");
let blobs_path = datadir.join("blobs_db");
let spec = env.eth2_config.spec.clone();
info!(
hot_path = %hot_path.display(),
cold_path = %cold_path.display(),
output_dir = %output_dir.display(),
"Opening database"
);
let db = HotColdDB::<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>::open(
&hot_path,
&cold_path,
&blobs_path,
|_, _, _| Ok(()),
StoreConfig::default(),
spec,
)
.map_err(|e| format!("Failed to open database: {e:?}"))?;
let anchor = db.get_anchor_info();
let split = db.get_split_info();
info!(
anchor_slot = %anchor.anchor_slot,
state_lower_limit = %anchor.state_lower_limit,
state_upper_limit = %anchor.state_upper_limit,
oldest_block_slot = %anchor.oldest_block_slot,
split_slot = %split.slot,
"Database info"
);
// Verify reconstruction is complete: state_lower_limit should equal state_upper_limit
if !anchor.all_historic_states_stored() {
return Err(format!(
"State reconstruction is not complete. \
state_lower_limit={}, state_upper_limit={}. \
Run with --reconstruct-historic-states first.",
anchor.state_lower_limit, anchor.state_upper_limit,
));
}
// Verify block backfill is complete
if anchor.oldest_block_slot > 0 {
return Err(format!(
"Block backfill is not complete. oldest_block_slot={}. \
Complete backfill sync first.",
anchor.oldest_block_slot,
));
}
let slots_per_historical_root = E::slots_per_historical_root() as u64;
// An ERA can only be created if its end slot <= split slot (finalized boundary)
let max_era = split.slot.as_u64() / slots_per_historical_root;
info!(max_era, "Producing ERA files from 0 to max_era");
std::fs::create_dir_all(&output_dir)
.map_err(|e| format!("Failed to create output directory: {e}"))?;
for era_number in 0..=max_era {
producer::create_era_file(&db, era_number, &output_dir)
.map_err(|e| format!("Failed to produce ERA file {era_number}: {e}"))?;
if (era_number + 1) % 100 == 0 || era_number == max_era {
info!(era_number, max_era, "Progress");
}
}
info!(max_era, "ERA file production complete");
Ok(())
}