diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 09aa28e86b..1e1d7e9ebe 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3,6 +3,7 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::{EventHandler, EventKind}; use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; +use crate::head_tracker::HeadTracker; use crate::metrics; use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use lmd_ghost::LmdGhost; @@ -126,6 +127,8 @@ pub struct BeaconChain { pub fork_choice: ForkChoice, /// A handler for events generated by the beacon chain. pub event_handler: T::EventHandler, + /// Used to track the heads of the beacon chain. + pub(crate) head_tracker: HeadTracker, /// Logging to CLI, etc. pub(crate) log: Logger, } @@ -141,6 +144,8 @@ impl BeaconChain { canonical_head: self.canonical_head.read().clone(), op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool), genesis_block_root: self.genesis_block_root, + ssz_head_tracker: self.head_tracker.to_ssz_container(), + fork_choice_ssz_bytes: self.fork_choice.as_bytes(), }; let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); @@ -1219,6 +1224,8 @@ impl BeaconChain { metrics::stop_timer(db_write_timer); + self.head_tracker.register_block(block_root, &block); + let fork_choice_register_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_REGISTER); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index b8e3b868ff..620b996f7c 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,5 +1,6 @@ use crate::eth1_chain::CachingEth1Backend; use crate::events::NullEventHandler; +use crate::head_tracker::HeadTracker; use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use crate::{ BeaconChain, BeaconChainTypes, CheckPoint, Eth1Chain, Eth1ChainBackend, EventHandler, @@ -9,7 +10,7 @@ use eth1::Config as Eth1Config; use lmd_ghost::{LmdGhost, ThreadSafeReducedTree}; use operation_pool::OperationPool; use parking_lot::RwLock; -use slog::{info, Logger}; +use slog::{error, info, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; use std::marker::PhantomData; use std::sync::Arc; @@ -88,6 +89,8 @@ pub struct BeaconChainBuilder { eth1_chain: Option>, event_handler: Option, slot_clock: Option, + persisted_beacon_chain: Option>, + head_tracker: Option, spec: ChainSpec, log: Option, } @@ -128,6 +131,8 @@ where eth1_chain: None, event_handler: None, slot_clock: None, + persisted_beacon_chain: None, + head_tracker: None, spec: TEthSpec::default_spec(), log: None, } @@ -214,6 +219,10 @@ where self.finalized_checkpoint = Some(p.canonical_head); self.genesis_block_root = Some(p.genesis_block_root); + self.head_tracker = HeadTracker::from_ssz_container(&p.ssz_head_tracker) + .map_err(|e| error!(log, "Failed to decode head tracker for database: {:?}", e)) + .ok(); + Ok(self) } @@ -379,6 +388,7 @@ where event_handler: self .event_handler .ok_or_else(|| "Cannot build without an event handler".to_string())?, + head_tracker: self.head_tracker.unwrap_or_default(), log: log.clone(), }; @@ -418,21 +428,27 @@ where /// `ThreadSafeReducedTree` backend. /// /// Requires the store and state to be initialized. - pub fn empty_reduced_tree_fork_choice(self) -> Result { + pub fn reduced_tree_fork_choice(self) -> Result { let store = self .store .clone() .ok_or_else(|| "reduced_tree_fork_choice requires a store")?; - let finalized_checkpoint = &self - .finalized_checkpoint - .as_ref() - .expect("should have finalized checkpoint"); - let backend = ThreadSafeReducedTree::new( - store.clone(), - &finalized_checkpoint.beacon_block, - finalized_checkpoint.beacon_block_root, - ); + let backend = if let Some(persisted_beacon_chain) = &self.persisted_beacon_chain { + ThreadSafeReducedTree::from_bytes(&persisted_beacon_chain.fork_choice_ssz_bytes, store) + .map_err(|e| format!("Unable to decode fork choice from db: {:?}", e))? + } else { + let finalized_checkpoint = &self + .finalized_checkpoint + .as_ref() + .expect("should have finalized checkpoint"); + + ThreadSafeReducedTree::new( + store.clone(), + &finalized_checkpoint.beacon_block, + finalized_checkpoint.beacon_block_root, + ) + }; self.fork_choice_backend(backend) } @@ -611,7 +627,7 @@ mod test { .null_event_handler() .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") - .empty_reduced_tree_fork_choice() + .reduced_tree_fork_choice() .expect("should add fork choice to builder") .build() .expect("should build"); diff --git a/beacon_node/beacon_chain/src/fork_choice.rs b/beacon_node/beacon_chain/src/fork_choice.rs index 9f9277693f..78b582828b 100644 --- a/beacon_node/beacon_chain/src/fork_choice.rs +++ b/beacon_node/beacon_chain/src/fork_choice.rs @@ -291,6 +291,14 @@ impl ForkChoice { .update_finalized_root(finalized_block, finalized_block_root) .map_err(Into::into) } + + /// Returns a byte-level representation of the present state of the fork choice cache. + /// + /// This simply calls `as_bytes()`, on the backend. To decode these bytes, decode the backend + /// directly then use `Self::new(..)`. + pub fn as_bytes(&self) -> Vec { + self.backend.as_bytes() + } } impl From for Error { diff --git a/beacon_node/beacon_chain/src/head_tracker.rs b/beacon_node/beacon_chain/src/head_tracker.rs new file mode 100644 index 0000000000..c3588b7e92 --- /dev/null +++ b/beacon_node/beacon_chain/src/head_tracker.rs @@ -0,0 +1,187 @@ +use parking_lot::RwLock; +use ssz_derive::{Decode, Encode}; +use std::collections::HashMap; +use std::iter::FromIterator; +use types::{BeaconBlock, EthSpec, Hash256, Slot}; + +#[derive(Debug, PartialEq)] +pub enum Error { + MismatchingLengths { roots_len: usize, slots_len: usize }, +} + +#[derive(Encode, Decode)] +pub struct SszHeadTracker { + roots: Vec, + slots: Vec, +} + +#[derive(Default, Debug)] +pub struct HeadTracker(RwLock>); + +impl HeadTracker { + pub fn register_block(&self, block_root: Hash256, block: &BeaconBlock) { + let mut map = self.0.write(); + + map.remove(&block.parent_root); + map.insert(block_root, block.slot); + } + + pub fn heads(&self) -> Vec<(Hash256, Slot)> { + self.0 + .read() + .iter() + .map(|(root, slot)| (*root, *slot)) + .collect() + } + + pub fn to_ssz_container(&self) -> SszHeadTracker { + let (roots, slots) = self + .0 + .read() + .iter() + .map(|(hash, slot)| (*hash, *slot)) + .unzip(); + + SszHeadTracker { roots, slots } + } + + pub fn from_ssz_container(ssz_container: &SszHeadTracker) -> Result { + let roots_len = ssz_container.roots.len(); + let slots_len = ssz_container.slots.len(); + + if roots_len != slots_len { + return Err(Error::MismatchingLengths { + roots_len, + slots_len, + }); + } else { + let map = HashMap::from_iter( + ssz_container + .roots + .iter() + .zip(ssz_container.slots.iter()) + .map(|(root, slot)| (*root, *slot)), + ); + + Ok(Self(RwLock::new(map))) + } + } +} + +impl PartialEq for HeadTracker { + fn eq(&self, other: &HeadTracker) -> bool { + *self.0.read() == *other.0.read() + } +} + +#[cfg(test)] +mod test { + use super::*; + use ssz::{Decode, Encode}; + use types::MainnetEthSpec; + + type E = MainnetEthSpec; + + #[test] + fn block_add() { + let spec = &E::default_spec(); + + let head_tracker = HeadTracker::default(); + + for i in 0..16 { + let mut block = BeaconBlock::empty(spec); + let block_root = Hash256::from_low_u64_be(i); + + block.slot = Slot::new(i); + block.parent_root = if i == 0 { + Hash256::random() + } else { + Hash256::from_low_u64_be(i - 1) + }; + + head_tracker.register_block::(block_root, &block); + } + + assert_eq!( + head_tracker.heads(), + vec![(Hash256::from_low_u64_be(15), Slot::new(15))], + "should only have one head" + ); + + let mut block = BeaconBlock::empty(spec); + let block_root = Hash256::from_low_u64_be(42); + block.slot = Slot::new(15); + block.parent_root = Hash256::from_low_u64_be(14); + head_tracker.register_block::(block_root, &block); + + let heads = head_tracker.heads(); + + assert_eq!(heads.len(), 2, "should only have two heads"); + assert!( + heads + .iter() + .any(|(root, slot)| *root == Hash256::from_low_u64_be(15) && *slot == Slot::new(15)), + "should contain first head" + ); + assert!( + heads + .iter() + .any(|(root, slot)| *root == Hash256::from_low_u64_be(42) && *slot == Slot::new(15)), + "should contain second head" + ); + } + + #[test] + fn empty_round_trip() { + let non_empty = HeadTracker::default(); + for i in 0..16 { + non_empty.0.write().insert(Hash256::random(), Slot::new(i)); + } + let bytes = non_empty.to_ssz_container().as_ssz_bytes(); + + assert_eq!( + HeadTracker::from_ssz_container( + &SszHeadTracker::from_ssz_bytes(&bytes).expect("should decode") + ), + Ok(non_empty), + "non_empty should pass round trip" + ); + } + + #[test] + fn non_empty_round_trip() { + let non_empty = HeadTracker::default(); + for i in 0..16 { + non_empty.0.write().insert(Hash256::random(), Slot::new(i)); + } + let bytes = non_empty.to_ssz_container().as_ssz_bytes(); + + assert_eq!( + HeadTracker::from_ssz_container( + &SszHeadTracker::from_ssz_bytes(&bytes).expect("should decode") + ), + Ok(non_empty), + "non_empty should pass round trip" + ); + } + + #[test] + fn bad_length() { + let container = SszHeadTracker { + roots: vec![Hash256::random()], + slots: vec![], + }; + let bytes = container.as_ssz_bytes(); + + assert_eq!( + HeadTracker::from_ssz_container( + &SszHeadTracker::from_ssz_bytes(&bytes).expect("should decode") + ), + Err(Error::MismatchingLengths { + roots_len: 1, + slots_len: 0 + }), + "should fail decoding with bad lengths" + ); + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 3e412fe67a..1112a6dd9a 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -9,6 +9,7 @@ mod errors; pub mod eth1_chain; pub mod events; mod fork_choice; +mod head_tracker; mod metrics; mod persisted_beacon_chain; pub mod test_utils; diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index 52a2028fca..c98dbeec14 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -1,3 +1,4 @@ +use crate::head_tracker::SszHeadTracker; use crate::{BeaconChainTypes, CheckPoint}; use operation_pool::PersistedOperationPool; use ssz::{Decode, Encode}; @@ -13,6 +14,8 @@ pub struct PersistedBeaconChain { pub canonical_head: CheckPoint, pub op_pool: PersistedOperationPool, pub genesis_block_root: Hash256, + pub ssz_head_tracker: SszHeadTracker, + pub fork_choice_ssz_bytes: Vec, } impl SimpleStoreItem for PersistedBeaconChain { diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 6916982e0a..c52630e4f6 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -100,7 +100,7 @@ impl BeaconChainHarness> { .null_event_handler() .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") - .empty_reduced_tree_fork_choice() + .reduced_tree_fork_choice() .expect("should add fork choice to builder") .build() .expect("should build"); @@ -142,7 +142,7 @@ impl BeaconChainHarness> { .null_event_handler() .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") - .empty_reduced_tree_fork_choice() + .reduced_tree_fork_choice() .expect("should add fork choice to builder") .build() .expect("should build"); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 1ebf96d769..325043e653 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -493,7 +493,7 @@ where .clone() .ok_or_else(|| "beacon_chain requires a slot clock")?, ) - .empty_reduced_tree_fork_choice() + .reduced_tree_fork_choice() .map_err(|e| format!("Failed to init fork choice: {}", e))? .build() .map_err(|e| format!("Failed to build beacon chain: {}", e))?; diff --git a/eth2/lmd_ghost/Cargo.toml b/eth2/lmd_ghost/Cargo.toml index d78faaab39..1a24537869 100644 --- a/eth2/lmd_ghost/Cargo.toml +++ b/eth2/lmd_ghost/Cargo.toml @@ -9,6 +9,8 @@ parking_lot = "0.9.0" store = { path = "../../beacon_node/store" } types = { path = "../types" } itertools = "0.8.1" +eth2_ssz = "0.1.2" +eth2_ssz_derive = "0.1.0" [dev-dependencies] criterion = "0.3.0" diff --git a/eth2/lmd_ghost/src/lib.rs b/eth2/lmd_ghost/src/lib.rs index d58affe146..c5fd95bc55 100644 --- a/eth2/lmd_ghost/src/lib.rs +++ b/eth2/lmd_ghost/src/lib.rs @@ -8,7 +8,7 @@ pub use reduced_tree::ThreadSafeReducedTree; pub type Result = std::result::Result; -pub trait LmdGhost: Send + Sync { +pub trait LmdGhost: Send + Sync + Sized { /// Create a new instance, with the given `store` and `finalized_root`. fn new(store: Arc, finalized_block: &BeaconBlock, finalized_root: Hash256) -> Self; @@ -52,4 +52,10 @@ pub trait LmdGhost: Send + Sync { /// Returns `Ok(())` if the underlying fork choice has maintained its integrity, /// `Err(description)` otherwise. fn verify_integrity(&self) -> Result<()>; + + /// Encode the `LmdGhost` instance to bytes. + fn as_bytes(&self) -> Vec; + + /// Create a new `LmdGhost` instance given a `store` and encoded bytes. + fn from_bytes(bytes: &[u8], store: Arc) -> Result; } diff --git a/eth2/lmd_ghost/src/reduced_tree.rs b/eth2/lmd_ghost/src/reduced_tree.rs index e26d472a1b..c6cb71a334 100644 --- a/eth2/lmd_ghost/src/reduced_tree.rs +++ b/eth2/lmd_ghost/src/reduced_tree.rs @@ -6,6 +6,8 @@ use super::{LmdGhost, Result as SuperResult}; use itertools::Itertools; use parking_lot::RwLock; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; use std::collections::HashMap; use std::fmt; use std::marker::PhantomData; @@ -26,6 +28,8 @@ pub enum Error { NoCommonAncestor((Hash256, Hash256)), StoreError(StoreError), ValidatorWeightUnknown(usize), + SszDecodingError(ssz::DecodeError), + InvalidReducedTreeSsz(String), } impl From for Error { @@ -34,6 +38,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: ssz::DecodeError) -> Error { + Error::SszDecodingError(e) + } +} + pub struct ThreadSafeReducedTree { core: RwLock>, } @@ -106,11 +116,73 @@ where self.core.read().latest_message(validator_index) } - fn verify_integrity(&self) -> std::result::Result<(), String> { + fn verify_integrity(&self) -> SuperResult<()> { self.core.read().verify_integrity() } + + /// Consume the `ReducedTree` object and return its ssz encoded bytes representation. + fn as_bytes(&self) -> Vec { + self.core.read().as_bytes() + } + + /// Create a new `ThreadSafeReducedTree` instance from a `store` and the + /// encoded ssz bytes representation. + /// + /// Returns an error if ssz bytes are not a valid `ReducedTreeSsz` object. + fn from_bytes(bytes: &[u8], store: Arc) -> SuperResult { + Ok(ThreadSafeReducedTree { + core: RwLock::new( + ReducedTree::from_bytes(bytes, store) + .map_err(|e| format!("Cannot decode ssz bytes {:?}", e))?, + ), + }) + } } +/// Intermediate representation of a `ReducedTree` `LmdGhost` fork choice. +#[derive(Debug, PartialEq, Encode, Decode)] +struct ReducedTreeSsz { + pub node_hashes: Vec, + pub nodes: Vec, + pub latest_votes: Vec>, + pub root_hash: Hash256, + pub root_slot: Slot, +} + +impl ReducedTreeSsz { + pub fn from_reduced_tree(tree: &ReducedTree) -> Self { + let (node_hashes, nodes): (Vec<_>, Vec<_>) = tree.nodes.clone().into_iter().unzip(); + ReducedTreeSsz { + node_hashes, + nodes, + latest_votes: tree.latest_votes.0.clone(), + root_hash: tree.root.0, + root_slot: tree.root.1, + } + } + + pub fn to_reduced_tree(self, store: Arc) -> Result> { + if self.node_hashes.len() != self.nodes.len() { + Error::InvalidReducedTreeSsz("node_hashes and nodes should have equal length".into()); + } + let nodes: HashMap<_, _> = self + .node_hashes + .into_iter() + .zip(self.nodes.into_iter()) + .collect(); + let latest_votes = ElasticList(self.latest_votes); + let root = (self.root_hash, self.root_slot); + Ok(ReducedTree { + store, + nodes, + latest_votes, + root, + _phantom: PhantomData, + }) + } +} + +#[derive(Clone)] struct ReducedTree { store: Arc, /// Stores all nodes of the tree, keyed by the block hash contained in the node. @@ -763,9 +835,19 @@ where fn root_slot(&self) -> Slot { self.root.1 } + + fn as_bytes(&self) -> Vec { + let reduced_tree_ssz: ReducedTreeSsz = ReducedTreeSsz::from_reduced_tree(&self); + reduced_tree_ssz.as_ssz_bytes() + } + + fn from_bytes(bytes: &[u8], store: Arc) -> Result { + let reduced_tree_ssz = ReducedTreeSsz::from_ssz_bytes(bytes)?; + Ok(reduced_tree_ssz.to_reduced_tree(store)?) + } } -#[derive(Default, Clone, Debug)] +#[derive(Default, Clone, Debug, PartialEq, Encode, Decode)] pub struct Node { /// Hash of the parent node in the reduced tree (not necessarily parent block). pub parent_hash: Option, @@ -775,7 +857,7 @@ pub struct Node { pub voters: Vec, } -#[derive(Default, Clone, Debug)] +#[derive(Default, Clone, Debug, PartialEq, Encode, Decode)] pub struct ChildLink { /// Hash of the child block (may not be a direct descendant). pub hash: Hash256, @@ -826,7 +908,7 @@ impl Node { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Encode, Decode)] pub struct Vote { hash: Hash256, slot: Slot, @@ -869,3 +951,27 @@ impl From for String { format!("{:?}", e) } } + +#[cfg(test)] +mod tests { + use super::*; + use store::MemoryStore; + use types::eth_spec::MinimalEthSpec; + + #[test] + fn test_reduced_tree_ssz() { + let store = Arc::new(MemoryStore::open()); + let tree = ReducedTree::::new( + store.clone(), + &BeaconBlock::empty(&MinimalEthSpec::default_spec()), + Hash256::zero(), + ); + let ssz_tree = ReducedTreeSsz::from_reduced_tree(&tree); + let bytes = tree.as_bytes(); + let recovered_tree = + ReducedTree::::from_bytes(&bytes, store.clone()).unwrap(); + + let recovered_ssz = ReducedTreeSsz::from_reduced_tree(&recovered_tree); + assert_eq!(ssz_tree, recovered_ssz); + } +} diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 631e799271..cb3ef42c94 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -170,7 +170,7 @@ pub struct Environment { runtime: Runtime, log: Logger, eth_spec_instance: E, - eth2_config: Eth2Config, + pub eth2_config: Eth2Config, } impl Environment { diff --git a/tests/beacon_chain_sim/src/main.rs b/tests/beacon_chain_sim/src/main.rs index 0853096d0d..a93a309eef 100644 --- a/tests/beacon_chain_sim/src/main.rs +++ b/tests/beacon_chain_sim/src/main.rs @@ -28,6 +28,8 @@ fn simulation(num_nodes: usize, validators_per_node: usize) -> Result<(), String .multi_threaded_tokio_runtime()? .build()?; + env.eth2_config.spec.milliseconds_per_slot = 2_000; + let mut base_config = testing_client_config(); let now = SystemTime::now()