diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 41fcb1128b..5e83fdd81b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -89,7 +89,7 @@ pub struct BeaconChain { pub state_store: Arc>, pub slot_clock: U, pub attestation_aggregator: RwLock, - pub op_pool: RwLock, + pub op_pool: OperationPool, canonical_head: RwLock, finalized_head: RwLock, pub state: RwLock, @@ -143,7 +143,7 @@ where state_store, slot_clock, attestation_aggregator, - op_pool: RwLock::new(OperationPool::new()), + op_pool: OperationPool::new(), state: RwLock::new(genesis_state), finalized_head, canonical_head, @@ -545,7 +545,6 @@ where attestation: Attestation, ) -> Result<(), AttestationValidationError> { self.op_pool - .write() .insert_attestation(attestation, &*self.state.read(), &self.spec) } @@ -555,21 +554,18 @@ where deposit: Deposit, ) -> Result { self.op_pool - .write() .insert_deposit(deposit, &*self.state.read(), &self.spec) } /// Accept some exit and queue it for inclusion in an appropriate block. pub fn process_voluntary_exit(&self, exit: VoluntaryExit) -> Result<(), ExitValidationError> { self.op_pool - .write() .insert_voluntary_exit(exit, &*self.state.read(), &self.spec) } /// Accept some transfer and queue it for inclusion in an appropriate block. pub fn process_transfer(&self, transfer: Transfer) -> Result<(), TransferValidationError> { self.op_pool - .write() .insert_transfer(transfer, &*self.state.read(), &self.spec) } @@ -578,11 +574,8 @@ where &self, proposer_slashing: ProposerSlashing, ) -> Result<(), ProposerSlashingValidationError> { - self.op_pool.write().insert_proposer_slashing( - proposer_slashing, - &*self.state.read(), - &self.spec, - ) + self.op_pool + .insert_proposer_slashing(proposer_slashing, &*self.state.read(), &self.spec) } /// Accept some attester slashing and queue it for inclusion in an appropriate block. @@ -590,11 +583,8 @@ where &self, attester_slashing: AttesterSlashing, ) -> Result<(), AttesterSlashingValidationError> { - self.op_pool.write().insert_attester_slashing( - attester_slashing, - &*self.state.read(), - &self.spec, - ) + self.op_pool + .insert_attester_slashing(attester_slashing, &*self.state.read(), &self.spec) } /// Accept some block and attempt to add it to block DAG. @@ -705,24 +695,12 @@ where trace!("Finding attestations for new block..."); - let attestations = self - .op_pool - .read() - .get_attestations(&*self.state.read(), &self.spec); - - trace!( - "Inserting {} attestation(s) into new block.", - attestations.len() - ); - let previous_block_root = *state .get_block_root(state.slot - 1, &self.spec) .map_err(|_| BlockProductionError::UnableToGetBlockRootFromState)?; - let (proposer_slashings, attester_slashings) = self - .op_pool - .read() - .get_slashings(&*self.state.read(), &self.spec); + let (proposer_slashings, attester_slashings) = + self.op_pool.get_slashings(&*self.state.read(), &self.spec); let mut block = BeaconBlock { slot: state.slot, @@ -738,19 +716,14 @@ where }, proposer_slashings, attester_slashings, - attestations, - deposits: self + attestations: self .op_pool - .read() - .get_deposits(&*self.state.read(), &self.spec), + .get_attestations(&*self.state.read(), &self.spec), + deposits: self.op_pool.get_deposits(&*self.state.read(), &self.spec), voluntary_exits: self .op_pool - .read() .get_voluntary_exits(&*self.state.read(), &self.spec), - transfers: self - .op_pool - .read() - .get_transfers(&*self.state.read(), &self.spec), + transfers: self.op_pool.get_transfers(&*self.state.read(), &self.spec), }, }; diff --git a/eth2/operation_pool/Cargo.toml b/eth2/operation_pool/Cargo.toml index 07cb618649..67d13013ca 100644 --- a/eth2/operation_pool/Cargo.toml +++ b/eth2/operation_pool/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] int_to_bytes = { path = "../utils/int_to_bytes" } itertools = "0.8" +parking_lot = "0.7" types = { path = "../types" } state_processing = { path = "../state_processing" } ssz = { path = "../utils/ssz" } diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index c3de95b48d..c42527b608 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -1,5 +1,6 @@ use int_to_bytes::int_to_bytes8; use itertools::Itertools; +use parking_lot::RwLock; use ssz::ssz_encode; use state_processing::per_block_processing::errors::{ AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, @@ -26,21 +27,21 @@ const VERIFY_DEPOSIT_PROOFS: bool = false; // TODO: enable this #[derive(Default)] pub struct OperationPool { /// Map from attestation ID (see below) to vectors of attestations. - attestations: HashMap>, + attestations: RwLock>>, /// Map from deposit index to deposit data. // NOTE: We assume that there is only one deposit per index // because the Eth1 data is updated (at most) once per epoch, // and the spec doesn't seem to accomodate for re-orgs on a time-frame // longer than an epoch - deposits: BTreeMap, + deposits: RwLock>, /// Map from two attestation IDs to a slashing for those IDs. - attester_slashings: HashMap<(AttestationId, AttestationId), AttesterSlashing>, + attester_slashings: RwLock>, /// Map from proposer index to slashing. - proposer_slashings: HashMap, + proposer_slashings: RwLock>, /// Map from exiting validator to their exit data. - voluntary_exits: HashMap, + voluntary_exits: RwLock>, /// Set of transfers. - transfers: HashSet, + transfers: RwLock>, } /// Serialized `AttestationData` augmented with a domain to encode the fork info. @@ -109,7 +110,7 @@ impl OperationPool { /// Insert an attestation into the pool, aggregating it with existing attestations if possible. pub fn insert_attestation( - &mut self, + &self, attestation: Attestation, state: &BeaconState, spec: &ChainSpec, @@ -119,7 +120,10 @@ impl OperationPool { let id = AttestationId::from_data(&attestation.data, state, spec); - let existing_attestations = match self.attestations.entry(id) { + // Take a write lock on the attestations map. + let mut attestations = self.attestations.write(); + + let existing_attestations = match attestations.entry(id) { hash_map::Entry::Vacant(entry) => { entry.insert(vec![attestation]); return Ok(()); @@ -146,7 +150,11 @@ impl OperationPool { /// Total number of attestations in the pool, including attestations for the same data. pub fn num_attestations(&self) -> usize { - self.attestations.values().map(|atts| atts.len()).sum() + self.attestations + .read() + .values() + .map(|atts| atts.len()) + .sum() } /// Get a list of attestations for inclusion in a block. @@ -157,6 +165,7 @@ impl OperationPool { let prev_domain_bytes = AttestationId::compute_domain_bytes(prev_epoch, state, spec); let curr_domain_bytes = AttestationId::compute_domain_bytes(current_epoch, state, spec); self.attestations + .read() .iter() .filter(|(key, _)| { key.domain_bytes_match(&prev_domain_bytes) @@ -180,8 +189,8 @@ impl OperationPool { // TODO: we could probably prune other attestations here: // - ones that are completely covered by attestations included in the state // - maybe ones invalidated by the confirmation of one fork over another - pub fn prune_attestations(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) { - self.attestations.retain(|_, attestations| { + pub fn prune_attestations(&self, finalized_state: &BeaconState, spec: &ChainSpec) { + self.attestations.write().retain(|_, attestations| { // All the attestations in this bucket have the same data, so we only need to // check the first one. attestations.first().map_or(false, |att| { @@ -194,14 +203,14 @@ impl OperationPool { /// /// No two distinct deposits should be added with the same index. pub fn insert_deposit( - &mut self, + &self, deposit: Deposit, state: &BeaconState, spec: &ChainSpec, ) -> Result { use DepositInsertStatus::*; - match self.deposits.entry(deposit.index) { + match self.deposits.write().entry(deposit.index) { Entry::Vacant(entry) => { verify_deposit(state, &deposit, VERIFY_DEPOSIT_PROOFS, spec)?; entry.insert(deposit); @@ -224,27 +233,26 @@ impl OperationPool { pub fn get_deposits(&self, state: &BeaconState, spec: &ChainSpec) -> Vec { let start_idx = state.deposit_index; (start_idx..start_idx + spec.max_deposits) - .map(|idx| self.deposits.get(&idx)) + .map(|idx| self.deposits.read().get(&idx).cloned()) .take_while(Option::is_some) .flatten() - .cloned() .collect() } /// Remove all deposits with index less than the deposit index of the latest finalised block. - pub fn prune_deposits(&mut self, state: &BeaconState) -> BTreeMap { - let deposits_keep = self.deposits.split_off(&state.deposit_index); - std::mem::replace(&mut self.deposits, deposits_keep) + pub fn prune_deposits(&self, state: &BeaconState) -> BTreeMap { + let deposits_keep = self.deposits.write().split_off(&state.deposit_index); + std::mem::replace(&mut self.deposits.write(), deposits_keep) } /// The number of deposits stored in the pool. pub fn num_deposits(&self) -> usize { - self.deposits.len() + self.deposits.read().len() } /// Insert a proposer slashing into the pool. pub fn insert_proposer_slashing( - &mut self, + &self, slashing: ProposerSlashing, state: &BeaconState, spec: &ChainSpec, @@ -253,6 +261,7 @@ impl OperationPool { // because they could *become* known later verify_proposer_slashing(&slashing, state, spec)?; self.proposer_slashings + .write() .insert(slashing.proposer_index, slashing); Ok(()) } @@ -273,14 +282,14 @@ impl OperationPool { /// Insert an attester slashing into the pool. pub fn insert_attester_slashing( - &mut self, + &self, slashing: AttesterSlashing, state: &BeaconState, spec: &ChainSpec, ) -> Result<(), AttesterSlashingValidationError> { verify_attester_slashing(state, &slashing, true, spec)?; let id = Self::attester_slashing_id(&slashing, state, spec); - self.attester_slashings.insert(id, slashing); + self.attester_slashings.write().insert(id, slashing); Ok(()) } @@ -295,7 +304,7 @@ impl OperationPool { spec: &ChainSpec, ) -> (Vec, Vec) { let proposer_slashings = filter_limit_operations( - self.proposer_slashings.values(), + self.proposer_slashings.read().values(), |slashing| { state .validator_registry @@ -314,6 +323,7 @@ impl OperationPool { let attester_slashings = self .attester_slashings + .read() .iter() .filter(|(id, slashing)| { // Check the fork. @@ -345,9 +355,9 @@ impl OperationPool { } /// Prune proposer slashings for all slashed or withdrawn validators. - pub fn prune_proposer_slashings(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) { + pub fn prune_proposer_slashings(&self, finalized_state: &BeaconState, spec: &ChainSpec) { prune_validator_hash_map( - &mut self.proposer_slashings, + &mut self.proposer_slashings.write(), |validator| { validator.slashed || validator.is_withdrawable_at(finalized_state.current_epoch(spec)) @@ -358,8 +368,8 @@ impl OperationPool { /// Prune attester slashings for all slashed or withdrawn validators, or attestations on another /// fork. - pub fn prune_attester_slashings(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) { - self.attester_slashings.retain(|id, slashing| { + pub fn prune_attester_slashings(&self, finalized_state: &BeaconState, spec: &ChainSpec) { + self.attester_slashings.write().retain(|id, slashing| { let fork_ok = &Self::attester_slashing_id(slashing, finalized_state, spec) == id; let curr_epoch = finalized_state.current_epoch(spec); let slashing_ok = gather_attester_slashing_indices_modular( @@ -375,29 +385,31 @@ impl OperationPool { /// Insert a voluntary exit, validating it almost-entirely (future exits are permitted). pub fn insert_voluntary_exit( - &mut self, + &self, exit: VoluntaryExit, state: &BeaconState, spec: &ChainSpec, ) -> Result<(), ExitValidationError> { verify_exit_time_independent_only(state, &exit, spec)?; - self.voluntary_exits.insert(exit.validator_index, exit); + self.voluntary_exits + .write() + .insert(exit.validator_index, exit); Ok(()) } /// Get a list of voluntary exits for inclusion in a block. pub fn get_voluntary_exits(&self, state: &BeaconState, spec: &ChainSpec) -> Vec { filter_limit_operations( - self.voluntary_exits.values(), + self.voluntary_exits.read().values(), |exit| verify_exit(state, exit, spec).is_ok(), spec.max_voluntary_exits, ) } /// Prune if validator has already exited at the last finalized state. - pub fn prune_voluntary_exits(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) { + pub fn prune_voluntary_exits(&self, finalized_state: &BeaconState, spec: &ChainSpec) { prune_validator_hash_map( - &mut self.voluntary_exits, + &mut self.voluntary_exits.write(), |validator| validator.is_exited_at(finalized_state.current_epoch(spec)), finalized_state, ); @@ -405,7 +417,7 @@ impl OperationPool { /// Insert a transfer into the pool, checking it for validity in the process. pub fn insert_transfer( - &mut self, + &self, transfer: Transfer, state: &BeaconState, spec: &ChainSpec, @@ -414,7 +426,7 @@ impl OperationPool { // it before we insert into the HashSet, we can't end up with duplicate // transactions. verify_transfer_time_independent_only(state, &transfer, spec)?; - self.transfers.insert(transfer); + self.transfers.write().insert(transfer); Ok(()) } @@ -423,6 +435,7 @@ impl OperationPool { // dependencies between transfers in the same block e.g. A pays B, B pays C pub fn get_transfers(&self, state: &BeaconState, spec: &ChainSpec) -> Vec { self.transfers + .read() .iter() .filter(|transfer| verify_transfer(state, transfer, spec).is_ok()) .sorted_by_key(|transfer| std::cmp::Reverse(transfer.fee)) @@ -432,16 +445,14 @@ impl OperationPool { } /// Prune the set of transfers by removing all those whose slot has already passed. - pub fn prune_transfers(&mut self, finalized_state: &BeaconState) { - self.transfers = self - .transfers - .drain() - .filter(|transfer| transfer.slot > finalized_state.slot) - .collect(); + pub fn prune_transfers(&self, finalized_state: &BeaconState) { + self.transfers + .write() + .retain(|transfer| transfer.slot > finalized_state.slot) } /// Prune all types of transactions given the latest finalized state. - pub fn prune_all(&mut self, finalized_state: &BeaconState, spec: &ChainSpec) { + pub fn prune_all(&self, finalized_state: &BeaconState, spec: &ChainSpec) { self.prune_attestations(finalized_state, spec); self.prune_deposits(finalized_state); self.prune_proposer_slashings(finalized_state, spec); @@ -497,7 +508,7 @@ mod tests { fn insert_deposit() { let rng = &mut XorShiftRng::from_seed([42; 16]); let (ref spec, ref state) = test_state(rng); - let mut op_pool = OperationPool::new(); + let op_pool = OperationPool::new(); let deposit1 = make_deposit(rng, state, spec); let mut deposit2 = make_deposit(rng, state, spec); deposit2.index = deposit1.index; @@ -520,7 +531,7 @@ mod tests { fn get_deposits_max() { let rng = &mut XorShiftRng::from_seed([42; 16]); let (spec, mut state) = test_state(rng); - let mut op_pool = OperationPool::new(); + let op_pool = OperationPool::new(); let start = 10000; let max_deposits = spec.max_deposits; let extra = 5; @@ -550,7 +561,7 @@ mod tests { fn prune_deposits() { let rng = &mut XorShiftRng::from_seed([42; 16]); let (spec, state) = test_state(rng); - let mut op_pool = OperationPool::new(); + let op_pool = OperationPool::new(); let start1 = 100; // test is super slow in debug mode if this parameter is too high @@ -734,7 +745,7 @@ mod tests { fn attestation_aggregation_insert_get_prune() { let spec = &ChainSpec::foundation(); let (ref mut state, ref keypairs) = attestation_test_state(spec, 1); - let mut op_pool = OperationPool::new(); + let op_pool = OperationPool::new(); let slot = state.slot - 1; let committees = state @@ -765,7 +776,7 @@ mod tests { } } - assert_eq!(op_pool.attestations.len(), committees.len()); + assert_eq!(op_pool.attestations.read().len(), committees.len()); assert_eq!(op_pool.num_attestations(), committees.len()); // Before the min attestation inclusion delay, get_attestations shouldn't return anything. @@ -799,7 +810,7 @@ mod tests { fn attestation_duplicate() { let spec = &ChainSpec::foundation(); let (ref mut state, ref keypairs) = attestation_test_state(spec, 1); - let mut op_pool = OperationPool::new(); + let op_pool = OperationPool::new(); let slot = state.slot - 1; let committees = state @@ -825,7 +836,7 @@ mod tests { fn attestation_pairwise_overlapping() { let spec = &ChainSpec::foundation(); let (ref mut state, ref keypairs) = attestation_test_state(spec, 1); - let mut op_pool = OperationPool::new(); + let op_pool = OperationPool::new(); let slot = state.slot - 1; let committees = state @@ -854,7 +865,7 @@ mod tests { // The attestations should get aggregated into two attestations that comprise all // validators. - assert_eq!(op_pool.attestations.len(), committees.len()); + assert_eq!(op_pool.attestations.read().len(), committees.len()); assert_eq!(op_pool.num_attestations(), 2 * committees.len()); } @@ -869,7 +880,7 @@ mod tests { let small_step_size = 2; let big_step_size = 4; let (ref mut state, ref keypairs) = attestation_test_state(spec, big_step_size); - let mut op_pool = OperationPool::new(); + let op_pool = OperationPool::new(); let slot = state.slot - 1; let committees = state @@ -907,7 +918,7 @@ mod tests { let num_small = target_committee_size / small_step_size; let num_big = target_committee_size / big_step_size; - assert_eq!(op_pool.attestations.len(), committees.len()); + assert_eq!(op_pool.attestations.read().len(), committees.len()); assert_eq!( op_pool.num_attestations(), (num_small + num_big) * committees.len()