From 73c4171b526bb08407aedee2c3e5eebcb115da5a Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 26 Jun 2019 13:06:08 +1000 Subject: [PATCH] op_pool: finish persistence support --- beacon_node/beacon_chain/Cargo.toml | 3 + beacon_node/beacon_chain/src/test_utils.rs | 6 +- beacon_node/beacon_chain/tests/tests.rs | 52 +++++- eth2/operation_pool/src/lib.rs | 14 +- eth2/operation_pool/src/persistence.rs | 191 +++++++++++++-------- 5 files changed, 183 insertions(+), 83 deletions(-) diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index abf1bc6472..b6f8d1ab8c 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -26,3 +26,6 @@ state_processing = { path = "../../eth2/state_processing" } tree_hash = { path = "../../eth2/utils/tree_hash" } types = { path = "../../eth2/types" } lmd_ghost = { path = "../../eth2/lmd_ghost" } + +[dev-dependencies] +rand = "0.5.5" diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 164857e5d8..9b3f7c1cb6 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -14,6 +14,8 @@ use types::{ Hash256, Keypair, RelativeEpoch, SecretKey, Signature, Slot, }; +pub use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; + /// Indicates how the `BeaconChainHarness` should produce blocks. #[derive(Clone, Copy, Debug)] pub enum BlockStrategy { @@ -68,8 +70,8 @@ where E: EthSpec, { pub chain: BeaconChain>, - keypairs: Vec, - spec: ChainSpec, + pub keypairs: Vec, + pub spec: ChainSpec, } impl BeaconChainHarness diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 17e373ad6a..882d9f2355 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -1,16 +1,21 @@ #![cfg(not(debug_assertions))] -use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; +use beacon_chain::test_utils::{ + AttestationStrategy, BeaconChainHarness, BlockStrategy, CommonTypes, PersistedBeaconChain, + BEACON_CHAIN_DB_KEY, +}; use lmd_ghost::ThreadSafeReducedTree; -use store::MemoryStore; -use types::{EthSpec, MinimalEthSpec, Slot}; +use rand::Rng; +use store::{MemoryStore, Store}; +use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; +use types::{Deposit, EthSpec, Hash256, MinimalEthSpec, Slot}; // Should ideally be divisible by 3. pub const VALIDATOR_COUNT: usize = 24; -fn get_harness( - validator_count: usize, -) -> BeaconChainHarness, MinimalEthSpec> { +type TestForkChoice = ThreadSafeReducedTree; + +fn get_harness(validator_count: usize) -> BeaconChainHarness { let harness = BeaconChainHarness::new(validator_count); // Move past the zero slot. @@ -225,3 +230,38 @@ fn does_not_finalize_without_attestation() { "no epoch should have been finalized" ); } + +#[test] +fn roundtrip_operation_pool() { + let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5; + + let harness = get_harness(VALIDATOR_COUNT); + + // Add some attestations + harness.extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + assert!(harness.chain.op_pool.num_attestations() > 0); + + // Add some deposits + let rng = &mut XorShiftRng::from_seed([66; 16]); + for _ in 0..rng.gen_range(1, VALIDATOR_COUNT) { + harness + .chain + .process_deposit(Deposit::random_for_test(rng)) + .unwrap(); + } + + // TODO: could add some other operations + harness.chain.persist().unwrap(); + + let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); + let p: PersistedBeaconChain> = + harness.chain.store.get(&key).unwrap().unwrap(); + + let restored_op_pool = p.op_pool.into_operation_pool(&p.state, &harness.spec); + + assert_eq!(harness.chain.op_pool, restored_op_pool); +} diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index c19b501ee7..6c6f1e7524 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -27,7 +27,7 @@ use types::{ Transfer, Validator, VoluntaryExit, }; -#[derive(Default)] +#[derive(Default, Debug)] pub struct OperationPool { /// Map from attestation ID (see below) to vectors of attestations. attestations: RwLock>>, @@ -442,6 +442,18 @@ fn prune_validator_hash_map( }); } +/// Compare two operation pools. +impl PartialEq for OperationPool { + fn eq(&self, other: &Self) -> bool { + *self.attestations.read() == *other.attestations.read() + && *self.deposits.read() == *other.deposits.read() + && *self.attester_slashings.read() == *other.attester_slashings.read() + && *self.proposer_slashings.read() == *other.proposer_slashings.read() + && *self.voluntary_exits.read() == *other.voluntary_exits.read() + && *self.transfers.read() == *other.transfers.read() + } +} + #[cfg(test)] mod tests { use super::DepositInsertStatus::*; diff --git a/eth2/operation_pool/src/persistence.rs b/eth2/operation_pool/src/persistence.rs index 78fe3ea2ed..992c14555c 100644 --- a/eth2/operation_pool/src/persistence.rs +++ b/eth2/operation_pool/src/persistence.rs @@ -1,12 +1,127 @@ use crate::attestation_id::AttestationId; use crate::OperationPool; -use itertools::Itertools; use parking_lot::RwLock; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::*; -/// Tuples for SSZ +/// SSZ-serializable version of `OperationPool`. +/// +/// Operations are stored in arbitrary order, so it's not a good idea to compare instances +/// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first. +#[derive(Encode, Decode)] +pub struct PersistedOperationPool { + /// Mapping from attestation ID to attestation mappings. + // We could save space by not storing the attestation ID, but it might + // be difficult to make that roundtrip due to eager aggregation. + attestations: Vec>>, + deposits: Vec, + /// Attester slashings. + attester_slashings: Vec, + /// Proposer slashings. + proposer_slashings: Vec, + /// Voluntary exits. + voluntary_exits: Vec, + /// Transfers. + transfers: Vec, +} + +impl PersistedOperationPool { + /// Convert an `OperationPool` into serializable form. + pub fn from_operation_pool(operation_pool: &OperationPool) -> Self { + let attestations = operation_pool + .attestations + .read() + .iter() + .map(|(att_id, att)| SszPair::new(att_id.clone(), att.clone())) + .collect(); + + let deposits = operation_pool + .deposits + .read() + .iter() + .map(|(_, d)| d.clone()) + .collect(); + + let attester_slashings = operation_pool + .attester_slashings + .read() + .iter() + .map(|(_, slashing)| slashing.clone()) + .collect(); + + let proposer_slashings = operation_pool + .proposer_slashings + .read() + .iter() + .map(|(_, slashing)| slashing.clone()) + .collect(); + + let voluntary_exits = operation_pool + .voluntary_exits + .read() + .iter() + .map(|(_, exit)| exit.clone()) + .collect(); + + let transfers = operation_pool.transfers.read().iter().cloned().collect(); + + Self { + attestations, + deposits, + attester_slashings, + proposer_slashings, + voluntary_exits, + transfers, + } + } + + /// Reconstruct an `OperationPool`. + pub fn into_operation_pool( + self, + state: &BeaconState, + spec: &ChainSpec, + ) -> OperationPool { + let attestations = RwLock::new(self.attestations.into_iter().map(SszPair::into).collect()); + let deposits = RwLock::new(self.deposits.into_iter().map(|d| (d.index, d)).collect()); + let attester_slashings = RwLock::new( + self.attester_slashings + .into_iter() + .map(|slashing| { + ( + OperationPool::attester_slashing_id(&slashing, state, spec), + slashing, + ) + }) + .collect(), + ); + let proposer_slashings = RwLock::new( + self.proposer_slashings + .into_iter() + .map(|slashing| (slashing.proposer_index, slashing)) + .collect(), + ); + let voluntary_exits = RwLock::new( + self.voluntary_exits + .into_iter() + .map(|exit| (exit.validator_index, exit)) + .collect(), + ); + let transfers = RwLock::new(self.transfers.into_iter().collect()); + + OperationPool { + attestations, + deposits, + attester_slashings, + proposer_slashings, + voluntary_exits, + transfers, + _phantom: Default::default(), + } + } +} + +/// Tuples for SSZ. #[derive(Encode, Decode)] struct SszPair { x: X, @@ -38,75 +153,3 @@ where (self.x, self.y) } } - -#[derive(Encode, Decode)] -pub struct PersistedOperationPool { - /// Mapping from attestation ID to attestation mappings, sorted by ID. - // TODO: we could save space by not storing the attestation ID, but it might - // be difficult to make that roundtrip due to eager aggregation. - attestations: Vec>>, - deposits: Vec, - /// Attester slashings sorted by their pair of attestation IDs (not stored). - attester_slashings: Vec, -} - -impl PersistedOperationPool { - pub fn from_operation_pool(operation_pool: &OperationPool) -> Self { - let attestations = operation_pool - .attestations - .read() - .iter() - .map(|(att_id, att)| SszPair::new(att_id.clone(), att.clone())) - .sorted_by(|att1, att2| Ord::cmp(&att1.x, &att2.x)) - .collect(); - - let deposits = operation_pool - .deposits - .read() - .iter() - .map(|(_, d)| d.clone()) - .collect(); - - let attester_slashings = operation_pool - .attester_slashings - .read() - .iter() - .sorted_by(|(id1, _), (id2, _)| Ord::cmp(&id1, &id2)) - .map(|(_, slashing)| slashing.clone()) - .collect(); - - Self { - attestations, - deposits, - attester_slashings, - } - } - - pub fn into_operation_pool( - self, - state: &BeaconState, - spec: &ChainSpec, - ) -> OperationPool { - let attestations = RwLock::new(self.attestations.into_iter().map(SszPair::into).collect()); - let deposits = RwLock::new(self.deposits.into_iter().map(|d| (d.index, d)).collect()); - let attester_slashings = RwLock::new( - self.attester_slashings - .into_iter() - .map(|slashing| { - ( - OperationPool::attester_slashing_id(&slashing, state, spec), - slashing, - ) - }) - .collect(), - ); - - OperationPool { - attestations, - deposits, - attester_slashings, - // TODO - ..OperationPool::new() - } - } -}