From f4621a9f1ae6bc6ae0c645583a27e3e4a60cfe92 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 15 Jun 2019 18:19:08 -0400 Subject: [PATCH] Improve reduced tree fork choice --- beacon_node/beacon_chain/src/beacon_chain.rs | 23 +- beacon_node/beacon_chain/src/fork_choice.rs | 50 ++-- beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/client/src/beacon_chain_types.rs | 26 +-- eth2/lmd_ghost/src/lib.rs | 6 +- eth2/lmd_ghost/src/reduced_tree.rs | 226 ++++++++++++++----- 6 files changed, 225 insertions(+), 107 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 31b1759e79..9e3f6b52cf 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -87,7 +87,6 @@ impl BeaconChain { mut genesis_state: BeaconState, genesis_block: BeaconBlock, spec: ChainSpec, - fork_choice: ForkChoice, ) -> Result { let state_root = genesis_state.canonical_root(); store.put(&state_root, &genesis_state)?; @@ -110,14 +109,14 @@ impl BeaconChain { Ok(Self { spec, - store, slot_clock, op_pool: OperationPool::new(), state: RwLock::new(genesis_state), canonical_head, genesis_block_root, - fork_choice, + fork_choice: ForkChoice::new(store.clone(), genesis_block_root), metrics: Metrics::new()?, + store, }) } @@ -139,16 +138,15 @@ impl BeaconChain { spec.seconds_per_slot, ); - // let fork_choice = T::ForkChoice::new(store.clone()); - // let fork_choice: ForkChoice = ForkChoice::new(store.clone()); + let last_finalized_root = p.canonical_head.beacon_state.finalized_root; Ok(Some(BeaconChain { spec, slot_clock, + fork_choice: ForkChoice::new(store.clone(), last_finalized_root), op_pool: OperationPool::default(), canonical_head: RwLock::new(p.canonical_head), state: RwLock::new(p.state), - fork_choice: ForkChoice::new(store.clone()), genesis_block_root: p.genesis_block_root, metrics: Metrics::new()?, store, @@ -476,11 +474,22 @@ impl BeaconChain { .op_pool .insert_attestation(attestation, &*self.state.read(), &self.spec); + timer.observe_duration(); + if result.is_ok() { self.metrics.attestation_processing_successes.inc(); } - timer.observe_duration(); + // TODO: process attestation. Please consider: + // + // - Because a block was not added to the op pool does not mean it's invalid (it might + // just be old). + // - The attestation should be rejected if we don't know the block (ideally it should be + // queued, but this may be overkill). + // - The attestation _must_ be validated against it's state before being added to fork + // choice. + // - You can avoid verifying some attestations by first checking if they're a latest + // message. This would involve expanding the `LmdGhost` API. result } diff --git a/beacon_node/beacon_chain/src/fork_choice.rs b/beacon_node/beacon_chain/src/fork_choice.rs index da20e934ff..eadb69ce95 100644 --- a/beacon_node/beacon_chain/src/fork_choice.rs +++ b/beacon_node/beacon_chain/src/fork_choice.rs @@ -21,9 +21,9 @@ pub struct ForkChoice { } impl ForkChoice { - pub fn new(store: Arc) -> Self { + pub fn new(store: Arc, genesis_block_root: Hash256) -> Self { Self { - backend: T::LmdGhost::new(store), + backend: T::LmdGhost::new(store, genesis_block_root), } } @@ -67,7 +67,29 @@ impl ForkChoice { .map_err(Into::into) } - pub fn process_attestation( + /// Process all attestations in the given `block`. + /// + /// Assumes the block (and therefore it's attestations) are valid. It is a logic error to + /// provide an invalid block. + pub fn process_block( + &self, + state: &BeaconState, + block: &BeaconBlock, + ) -> Result<()> { + // Note: we never count the block as a latest message, only attestations. + // + // I (Paul H) do not have an explicit reference to this, but I derive it from this + // document: + // + // https://github.com/ethereum/eth2.0-specs/blob/v0.7.0/specs/core/0_fork-choice.md + for attestation in &block.body.attestations { + self.process_attestation_from_block(state, attestation)?; + } + + Ok(()) + } + + fn process_attestation_from_block( &self, state: &BeaconState, attestation: &Attestation, @@ -94,28 +116,6 @@ impl ForkChoice { Ok(()) } - - /// A helper function which runs `self.process_attestation` on all `Attestation` in the given `BeaconBlock`. - /// - /// Assumes the block (and therefore it's attestations) are valid. It is a logic error to - /// provide an invalid block. - pub fn process_block( - &self, - state: &BeaconState, - block: &BeaconBlock, - ) -> Result<()> { - // Note: we never count the block as a latest message, only attestations. - // - // I (Paul H) do not have an explicit reference to this, however I derive it from this - // document: - // - // https://github.com/ethereum/eth2.0-specs/blob/v0.7.0/specs/core/0_fork-choice.md - for attestation in &block.body.attestations { - self.process_attestation(state, attestation)?; - } - - Ok(()) - } } impl From for Error { diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index b4250fe0fc..4aa1370d97 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -9,6 +9,7 @@ mod persisted_beacon_chain; pub use self::beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; pub use self::checkpoint::CheckPoint; pub use self::errors::{BeaconChainError, BlockProductionError}; +pub use lmd_ghost; pub use parking_lot; pub use slot_clock; pub use state_processing::per_block_processing::errors::{ diff --git a/beacon_node/client/src/beacon_chain_types.rs b/beacon_node/client/src/beacon_chain_types.rs index c55c04b443..c923f724c8 100644 --- a/beacon_node/client/src/beacon_chain_types.rs +++ b/beacon_node/client/src/beacon_chain_types.rs @@ -1,8 +1,9 @@ use beacon_chain::{ - fork_choice::OptimizedLMDGhost, slot_clock::SystemTimeSlotClock, store::Store, BeaconChain, - BeaconChainTypes, + lmd_ghost::{LmdGhost, ThreadSafeReducedTree}, + slot_clock::SystemTimeSlotClock, + store::Store, + BeaconChain, BeaconChainTypes, }; -use fork_choice::ForkChoice; use slog::{info, Logger}; use slot_clock::SlotClock; use std::marker::PhantomData; @@ -33,7 +34,7 @@ pub struct ClientType { impl BeaconChainTypes for ClientType { type Store = S; type SlotClock = SystemTimeSlotClock; - type ForkChoice = OptimizedLMDGhost; + type LmdGhost = ThreadSafeReducedTree; type EthSpec = E; } impl InitialiseBeaconChain for ClientType {} @@ -45,8 +46,8 @@ fn maybe_load_from_store_for_testnet( log: Logger, ) -> BeaconChain where - T: BeaconChainTypes, - T::ForkChoice: ForkChoice, + T: BeaconChainTypes, + T::LmdGhost: LmdGhost, { if let Ok(Some(beacon_chain)) = BeaconChain::from_store(store.clone(), spec.clone()) { info!( @@ -74,19 +75,10 @@ where genesis_state.genesis_time, spec.seconds_per_slot, ); - // Choose the fork choice - let fork_choice = T::ForkChoice::new(store.clone()); // Genesis chain //TODO: Handle error correctly - BeaconChain::from_genesis( - store, - slot_clock, - genesis_state, - genesis_block, - spec, - fork_choice, - ) - .expect("Terminate if beacon chain generation fails") + BeaconChain::from_genesis(store, slot_clock, genesis_state, genesis_block, spec) + .expect("Terminate if beacon chain generation fails") } } diff --git a/eth2/lmd_ghost/src/lib.rs b/eth2/lmd_ghost/src/lib.rs index a50532758d..59495e382c 100644 --- a/eth2/lmd_ghost/src/lib.rs +++ b/eth2/lmd_ghost/src/lib.rs @@ -9,7 +9,7 @@ pub use reduced_tree::ThreadSafeReducedTree; pub type Result = std::result::Result; pub trait LmdGhost: Send + Sync { - fn new(store: Arc) -> Self; + fn new(store: Arc, genesis_root: Hash256) -> Self; fn process_message( &self, @@ -20,5 +20,7 @@ pub trait LmdGhost: Send + Sync { fn find_head(&self, start_block_root: Hash256, weight: F) -> Result where - F: Fn(usize) -> Option; + F: Fn(usize) -> Option + Copy; + + fn update_finalized_root(&self, new_root: Hash256) -> Result<()>; } diff --git a/eth2/lmd_ghost/src/reduced_tree.rs b/eth2/lmd_ghost/src/reduced_tree.rs index 44b34b070c..84249b603f 100644 --- a/eth2/lmd_ghost/src/reduced_tree.rs +++ b/eth2/lmd_ghost/src/reduced_tree.rs @@ -16,6 +16,7 @@ pub enum Error { NotInTree(Hash256), NoCommonAncestor((Hash256, Hash256)), StoreError(StoreError), + ValidatorWeightUnknown(usize), } impl From for Error { @@ -24,43 +25,8 @@ impl From for Error { } } -pub type Height = usize; - -#[derive(Default, Clone)] -pub struct Node { - pub parent_hash: Option, - pub children: Vec, - pub score: u64, - pub height: Height, - pub block_hash: Hash256, - pub voters: Vec, -} - -impl Node { - pub fn remove_voter(&mut self, voter: usize) -> Option { - let i = self.voters.iter().position(|&v| v == voter)?; - Some(self.voters.remove(i)) - } - - pub fn add_voter(&mut self, voter: usize) { - self.voters.push(voter); - } - - pub fn has_votes(&self) -> bool { - !self.voters.is_empty() - } -} - -impl Node { - fn does_not_have_children(&self) -> bool { - self.children.is_empty() - } -} - -#[derive(Debug, Clone, Copy)] -pub struct Vote { - hash: Hash256, - slot: Slot, +pub struct ThreadSafeReducedTree { + core: RwLock>, } impl LmdGhost for ThreadSafeReducedTree @@ -68,9 +34,9 @@ where T: Store, E: EthSpec, { - fn new(store: Arc) -> Self { + fn new(store: Arc, genesis_root: Hash256) -> Self { ThreadSafeReducedTree { - core: RwLock::new(ReducedTree::new(store)), + core: RwLock::new(ReducedTree::new(store, genesis_root)), } } @@ -86,30 +52,29 @@ where .map_err(Into::into) } - fn find_head(&self, _start_block_root: Hash256, _weight: F) -> SuperResult + fn find_head(&self, start_block_root: Hash256, weight_fn: F) -> SuperResult where - F: Fn(usize) -> Option, + F: Fn(usize) -> Option + Copy, { - unimplemented!(); + self.core + .write() + .update_weights_and_find_head(start_block_root, weight_fn) + .map_err(Into::into) + } + + fn update_finalized_root(&self, new_root: Hash256) -> SuperResult<()> { + self.core.write().update_root(new_root).map_err(Into::into) } } -impl From for String { - fn from(e: Error) -> String { - format!("{:?}", e) - } -} - -pub struct ThreadSafeReducedTree { - pub core: RwLock>, -} - -pub struct ReducedTree { +struct ReducedTree { store: Arc, /// Stores all nodes of the tree, keyed by the block hash contained in the node. nodes: HashMap, /// Maps validator indices to their latest votes. latest_votes: ElasticList>, + /// Stores the root of the tree, used for pruning. + root: Hash256, _phantom: PhantomData, } @@ -118,15 +83,54 @@ where T: Store, E: EthSpec, { - pub fn new(store: Arc) -> Self { + pub fn new(store: Arc, genesis_root: Hash256) -> Self { + let mut nodes = HashMap::new(); + + // Insert the genesis node. + nodes.insert( + genesis_root, + Node { + block_hash: genesis_root, + ..Node::default() + }, + ); + Self { store, - nodes: HashMap::new(), + nodes, latest_votes: ElasticList::default(), + root: genesis_root, _phantom: PhantomData, } } + pub fn update_root(&mut self, new_root: Hash256) -> Result<()> { + if !self.nodes.contains_key(&new_root) { + self.add_node(new_root, vec![])?; + } + + self.retain_subtree(self.root, new_root)?; + + self.root = new_root; + + Ok(()) + } + + fn retain_subtree(&mut self, current_hash: Hash256, subtree_hash: Hash256) -> Result<()> { + if current_hash != subtree_hash { + // Clone satisifies the borrow checker. + let children = self.get_node(current_hash)?.children.clone(); + + for child_hash in children { + self.retain_subtree(child_hash, subtree_hash)?; + } + + self.nodes.remove(¤t_hash); + } + + Ok(()) + } + pub fn process_message( &mut self, validator_index: usize, @@ -143,7 +147,7 @@ where } else if previous_vote.slot == slot && previous_vote.hash != block_hash { // Vote is an equivocation (double-vote), ignore it. // - // TODO: flag this as slashable. + // TODO: this is slashable. return Ok(()); } else { // Given vote is newer or different to current vote, replace the current vote. @@ -156,6 +160,76 @@ where Ok(()) } + pub fn update_weights_and_find_head( + &mut self, + start_block_root: Hash256, + weight_fn: F, + ) -> Result + where + F: Fn(usize) -> Option + Copy, + { + let _root_weight = self.update_weight(start_block_root, weight_fn)?; + + let start_node = self.get_node(start_block_root)?; + let head_node = self.find_head_from(start_node)?; + + Ok(head_node.block_hash) + } + + fn find_head_from<'a>(&'a self, start_node: &'a Node) -> Result<&'a Node> { + if start_node.does_not_have_children() { + Ok(start_node) + } else { + let children = start_node + .children + .iter() + .map(|hash| self.get_node(*hash)) + .collect::>>()?; + + // TODO: check if `max_by` is `O(n^2)`. + let best_child = children + .iter() + .max_by(|a, b| { + if a.weight != b.weight { + a.weight.cmp(&b.weight) + } else { + a.block_hash.cmp(&b.block_hash) + } + }) + // There can only be no maximum if there are no children. This code path is guarded + // against that condition. + .expect("There must be a maximally weighted node."); + + self.find_head_from(best_child) + } + } + + fn update_weight(&mut self, start_block_root: Hash256, weight_fn: F) -> Result + where + F: Fn(usize) -> Option + Copy, + { + let weight = { + let node = self.get_node(start_block_root)?.clone(); + + let mut weight = 0; + + for &child in &node.children { + weight += self.update_weight(child, weight_fn)?; + } + + for &voter in &node.voters { + weight += weight_fn(voter).ok_or_else(|| Error::ValidatorWeightUnknown(voter))?; + } + + weight + }; + + let node = self.get_mut_node(start_block_root)?; + node.weight = weight; + + Ok(weight) + } + fn remove_latest_message(&mut self, validator_index: usize) -> Result<()> { if self.latest_votes.get(validator_index).is_some() { // Unwrap is safe as prior `if` statements ensures the result is `Some`. @@ -390,6 +464,40 @@ where } } +#[derive(Default, Clone)] +pub struct Node { + pub parent_hash: Option, + pub children: Vec, + pub weight: u64, + pub block_hash: Hash256, + pub voters: Vec, +} + +impl Node { + pub fn does_not_have_children(&self) -> bool { + self.children.is_empty() + } + + pub fn remove_voter(&mut self, voter: usize) -> Option { + let i = self.voters.iter().position(|&v| v == voter)?; + Some(self.voters.remove(i)) + } + + pub fn add_voter(&mut self, voter: usize) { + self.voters.push(voter); + } + + pub fn has_votes(&self) -> bool { + !self.voters.is_empty() + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Vote { + hash: Hash256, + slot: Slot, +} + /// A Vec-wrapper which will grow to match any request. /// /// E.g., a `get` or `insert` to an out-of-bounds element will cause the Vec to grow (using @@ -417,3 +525,9 @@ where self.0[i] = element; } } + +impl From for String { + fn from(e: Error) -> String { + format!("{:?}", e) + } +}