diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0137a0746b..07801eb2a7 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6,7 +6,7 @@ use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use lmd_ghost::LmdGhost; use log::trace; use operation_pool::DepositInsertStatus; -use operation_pool::OperationPool; +use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::{RwLock, RwLockReadGuard}; use slot_clock::SlotClock; use state_processing::per_block_processing::errors::{ @@ -147,11 +147,13 @@ impl BeaconChain { let last_finalized_root = p.canonical_head.beacon_state.finalized_root; let last_finalized_block = &p.canonical_head.beacon_block; + let op_pool = p.op_pool.into_operation_pool(&p.state, &spec); + Ok(Some(BeaconChain { spec, slot_clock, fork_choice: ForkChoice::new(store.clone(), last_finalized_block, last_finalized_root), - op_pool: OperationPool::default(), + op_pool, canonical_head: RwLock::new(p.canonical_head), state: RwLock::new(p.state), genesis_block_root: p.genesis_block_root, @@ -164,6 +166,7 @@ impl BeaconChain { pub fn persist(&self) -> Result<(), Error> { let p: PersistedBeaconChain = PersistedBeaconChain { canonical_head: self.canonical_head.read().clone(), + op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool), genesis_block_root: self.genesis_block_root, state: self.state.read().clone(), }; diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index f5bdfdee15..479e1cd8e9 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -1,4 +1,5 @@ use crate::{BeaconChainTypes, CheckPoint}; +use operation_pool::PersistedOperationPool; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use store::{DBColumn, Error as StoreError, StoreItem}; @@ -10,7 +11,7 @@ pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA"; #[derive(Encode, Decode)] pub struct PersistedBeaconChain { pub canonical_head: CheckPoint, - // TODO: operations pool. + pub op_pool: PersistedOperationPool, pub genesis_block_root: Hash256, pub state: BeaconState, } diff --git a/eth2/operation_pool/Cargo.toml b/eth2/operation_pool/Cargo.toml index 99c7bfc06f..770843b754 100644 --- a/eth2/operation_pool/Cargo.toml +++ b/eth2/operation_pool/Cargo.toml @@ -12,3 +12,4 @@ parking_lot = "0.7" types = { path = "../types" } state_processing = { path = "../state_processing" } ssz = { path = "../utils/ssz" } +ssz_derive = { path = "../utils/ssz_derive" } diff --git a/eth2/operation_pool/src/attestation_id.rs b/eth2/operation_pool/src/attestation_id.rs index ba25174e65..a79023a699 100644 --- a/eth2/operation_pool/src/attestation_id.rs +++ b/eth2/operation_pool/src/attestation_id.rs @@ -1,10 +1,13 @@ use int_to_bytes::int_to_bytes8; use ssz::ssz_encode; +use ssz_derive::{Decode, Encode}; use types::{AttestationData, BeaconState, ChainSpec, Domain, Epoch, EthSpec}; /// Serialized `AttestationData` augmented with a domain to encode the fork info. -#[derive(PartialEq, Eq, Clone, Hash, Debug)] -pub struct AttestationId(Vec); +#[derive(PartialEq, Eq, Clone, Hash, Debug, PartialOrd, Ord, Encode, Decode)] +pub struct AttestationId { + v: Vec, +} /// Number of domain bytes that the end of an attestation ID is padded with. const DOMAIN_BYTES_LEN: usize = 8; @@ -18,7 +21,7 @@ impl AttestationId { let mut bytes = ssz_encode(attestation); let epoch = attestation.target_epoch; bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, state, spec)); - AttestationId(bytes) + AttestationId { v: bytes } } pub fn compute_domain_bytes( @@ -30,6 +33,6 @@ impl AttestationId { } pub fn domain_bytes_match(&self, domain_bytes: &[u8]) -> bool { - &self.0[self.0.len() - DOMAIN_BYTES_LEN..] == domain_bytes + &self.v[self.v.len() - DOMAIN_BYTES_LEN..] == domain_bytes } } diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index 7157ec9eec..d19d68080c 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -1,6 +1,9 @@ mod attestation; mod attestation_id; mod max_cover; +mod persistence; + +pub use persistence::PersistedOperationPool; use attestation::{earliest_attestation_validators, AttMaxCover}; use attestation_id::AttestationId; diff --git a/eth2/operation_pool/src/persistence.rs b/eth2/operation_pool/src/persistence.rs new file mode 100644 index 0000000000..78fe3ea2ed --- /dev/null +++ b/eth2/operation_pool/src/persistence.rs @@ -0,0 +1,112 @@ +use crate::attestation_id::AttestationId; +use crate::OperationPool; +use itertools::Itertools; +use parking_lot::RwLock; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; +use types::*; + +/// Tuples for SSZ +#[derive(Encode, Decode)] +struct SszPair { + x: X, + y: Y, +} + +impl SszPair { + fn new(x: X, y: Y) -> Self { + Self { x, y } + } +} + +impl From<(X, Y)> for SszPair +where + X: Encode + Decode, + Y: Encode + Decode, +{ + fn from((x, y): (X, Y)) -> Self { + Self { x, y } + } +} + +impl Into<(X, Y)> for SszPair +where + X: Encode + Decode, + Y: Encode + Decode, +{ + fn into(self) -> (X, Y) { + (self.x, self.y) + } +} + +#[derive(Encode, Decode)] +pub struct PersistedOperationPool { + /// Mapping from attestation ID to attestation mappings, sorted by ID. + // TODO: we could save space by not storing the attestation ID, but it might + // be difficult to make that roundtrip due to eager aggregation. + attestations: Vec>>, + deposits: Vec, + /// Attester slashings sorted by their pair of attestation IDs (not stored). + attester_slashings: Vec, +} + +impl PersistedOperationPool { + pub fn from_operation_pool(operation_pool: &OperationPool) -> Self { + let attestations = operation_pool + .attestations + .read() + .iter() + .map(|(att_id, att)| SszPair::new(att_id.clone(), att.clone())) + .sorted_by(|att1, att2| Ord::cmp(&att1.x, &att2.x)) + .collect(); + + let deposits = operation_pool + .deposits + .read() + .iter() + .map(|(_, d)| d.clone()) + .collect(); + + let attester_slashings = operation_pool + .attester_slashings + .read() + .iter() + .sorted_by(|(id1, _), (id2, _)| Ord::cmp(&id1, &id2)) + .map(|(_, slashing)| slashing.clone()) + .collect(); + + Self { + attestations, + deposits, + attester_slashings, + } + } + + pub fn into_operation_pool( + self, + state: &BeaconState, + spec: &ChainSpec, + ) -> OperationPool { + let attestations = RwLock::new(self.attestations.into_iter().map(SszPair::into).collect()); + let deposits = RwLock::new(self.deposits.into_iter().map(|d| (d.index, d)).collect()); + let attester_slashings = RwLock::new( + self.attester_slashings + .into_iter() + .map(|slashing| { + ( + OperationPool::attester_slashing_id(&slashing, state, spec), + slashing, + ) + }) + .collect(), + ); + + OperationPool { + attestations, + deposits, + attester_slashings, + // TODO + ..OperationPool::new() + } + } +}