From 103103e72e120ecb0927bebf74c47068e41bae71 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 17 Nov 2020 23:11:26 +0000 Subject: [PATCH] Address queue congestion in migrator (#1923) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Issue Addressed *Should* address #1917 ## Proposed Changes Stops the `BackgroupMigrator` rx channel from backing up with big `BeaconState` messages. Looking at some logs from my Medalla node, we can see a discrepancy between the head finalized epoch and the migrator finalized epoch: ``` Nov 17 16:50:21.606 DEBG Head beacon block slot: 129214, root: 0xbc7a…0b99, finalized_epoch: 4033, finalized_root: 0xf930…6562, justified_epoch: 4035, justified_root: 0x206b…9321, service: beacon Nov 17 16:50:21.626 DEBG Batch processed service: sync, processed_blocks: 43, last_block_slot: 129214, chain: 8274002112260436595, first_block_slot: 129153, batch_epoch: 4036 Nov 17 16:50:21.626 DEBG Chain advanced processing_target: 4036, new_start: 4036, previous_start: 4034, chain: 8274002112260436595, service: sync Nov 17 16:50:22.162 DEBG Completed batch received awaiting_batches: 5, blocks: 47, epoch: 4048, chain: 8274002112260436595, service: sync Nov 17 16:50:22.162 DEBG Requesting batch start_slot: 129601, end_slot: 129664, downloaded: 0, processed: 0, state: Downloading(16Uiu2HAmG3C3t1McaseReECjAF694tjVVjkDoneZEbxNhWm1nZaT, 0 blocks, 1273), epoch: 4050, chain: 8274002112260436595, service: sync Nov 17 16:50:22.654 DEBG Database compaction complete service: beacon Nov 17 16:50:22.655 INFO Starting database pruning new_finalized_epoch: 2193, old_finalized_epoch: 2192, service: beacon ``` I believe this indicates that the migrator rx has a backed-up queue of `MigrationNotification` items which each contain a `BeaconState`. ## TODO - [x] Remove finalized state requirement for op-pool --- beacon_node/beacon_chain/src/beacon_chain.rs | 68 +++++++++--------- beacon_node/beacon_chain/src/migrate.rs | 51 +++++++------ beacon_node/operation_pool/src/lib.rs | 75 +++++++++++--------- 3 files changed, 108 insertions(+), 86 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1f3a18b3e3..c97159d9a4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1926,25 +1926,6 @@ impl BeaconChain { }; let new_finalized_checkpoint = new_head.beacon_state.finalized_checkpoint; - // State root of the finalized state on the epoch boundary, NOT the state - // of the finalized block. We need to use an iterator in case the state is beyond - // the reach of the new head's `state_roots` array. - let new_finalized_slot = new_finalized_checkpoint - .epoch - .start_slot(T::EthSpec::slots_per_epoch()); - let new_finalized_state_root = process_results( - StateRootsIterator::new(self.store.clone(), &new_head.beacon_state), - |mut iter| { - iter.find_map(|(state_root, slot)| { - if slot == new_finalized_slot { - Some(state_root) - } else { - None - } - }) - }, - )? - .ok_or_else(|| Error::MissingFinalizedStateRoot(new_finalized_slot))?; // It is an error to try to update to a head with a lesser finalized epoch. if new_finalized_checkpoint.epoch < old_finalized_checkpoint.epoch { @@ -1991,7 +1972,39 @@ impl BeaconChain { }); if new_finalized_checkpoint.epoch != old_finalized_checkpoint.epoch { - self.after_finalization(new_finalized_checkpoint, new_finalized_state_root)?; + // Due to race conditions, it's technically possible that the head we load here is + // different to the one earlier in this function. + // + // Since the head can't move backwards in terms of finalized epoch, we can only load a + // head with a *later* finalized state. There is no harm in this. + let head = self + .canonical_head + .try_read_for(HEAD_LOCK_TIMEOUT) + .ok_or_else(|| Error::CanonicalHeadLockTimeout)?; + + // State root of the finalized state on the epoch boundary, NOT the state + // of the finalized block. We need to use an iterator in case the state is beyond + // the reach of the new head's `state_roots` array. + let new_finalized_slot = head + .beacon_state + .finalized_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + let new_finalized_state_root = process_results( + StateRootsIterator::new(self.store.clone(), &head.beacon_state), + |mut iter| { + iter.find_map(|(state_root, slot)| { + if slot == new_finalized_slot { + Some(state_root) + } else { + None + } + }) + }, + )? + .ok_or_else(|| Error::MissingFinalizedStateRoot(new_finalized_slot))?; + + self.after_finalization(&head.beacon_state, new_finalized_state_root)?; } let _ = self.event_handler.register(EventKind::BeaconHeadChanged { @@ -2072,10 +2085,11 @@ impl BeaconChain { /// Performs pruning and finality-based optimizations. fn after_finalization( &self, - new_finalized_checkpoint: Checkpoint, + head_state: &BeaconState, new_finalized_state_root: Hash256, ) -> Result<(), Error> { self.fork_choice.write().prune()?; + let new_finalized_checkpoint = head_state.finalized_checkpoint; self.observed_block_producers.prune( new_finalized_checkpoint @@ -2097,20 +2111,10 @@ impl BeaconChain { ); }); - let finalized_state = self - .get_state(&new_finalized_state_root, None)? - .ok_or_else(|| Error::MissingBeaconState(new_finalized_state_root))?; - - self.op_pool.prune_all( - &finalized_state, - self.epoch()?, - self.head_info()?.fork, - &self.spec, - ); + self.op_pool.prune_all(head_state, self.epoch()?); self.store_migrator.process_finalization( new_finalized_state_root.into(), - finalized_state, new_finalized_checkpoint, self.head_tracker.clone(), )?; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 00a5385d04..92f8efe435 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -3,7 +3,7 @@ use crate::errors::BeaconChainError; use crate::head_tracker::{HeadTracker, SszHeadTracker}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use parking_lot::Mutex; -use slog::{debug, info, warn, Logger}; +use slog::{debug, error, info, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::mem; use std::sync::{mpsc, Arc}; @@ -30,12 +30,7 @@ const COMPACTION_FINALITY_DISTANCE: u64 = 1024; pub struct BackgroundMigrator, Cold: ItemStore> { db: Arc>, #[allow(clippy::type_complexity)] - tx_thread: Option< - Mutex<( - mpsc::Sender>, - thread::JoinHandle<()>, - )>, - >, + tx_thread: Option, thread::JoinHandle<()>)>>, /// Genesis block root, for persisting the `PersistedBeaconChain`. genesis_block_root: Hash256, log: Logger, @@ -78,9 +73,8 @@ pub enum PruningError { } /// Message sent to the migration thread containing the information it needs to run. -pub struct MigrationNotification { +pub struct MigrationNotification { finalized_state_root: BeaconStateHash, - finalized_state: BeaconState, finalized_checkpoint: Checkpoint, head_tracker: Arc, genesis_block_root: Hash256, @@ -115,13 +109,11 @@ impl, Cold: ItemStore> BackgroundMigrator, finalized_checkpoint: Checkpoint, head_tracker: Arc, ) -> Result<(), BeaconChainError> { let notif = MigrationNotification { finalized_state_root, - finalized_state, finalized_checkpoint, head_tracker, genesis_block_root: self.genesis_block_root, @@ -161,13 +153,21 @@ impl, Cold: ItemStore> BackgroundMigrator>, - notif: MigrationNotification, - log: &Logger, - ) { + fn run_migration(db: Arc>, notif: MigrationNotification, log: &Logger) { let finalized_state_root = notif.finalized_state_root; - let finalized_state = notif.finalized_state; + + let finalized_state = match db.get_state(&finalized_state_root.into(), None) { + Ok(Some(state)) => state, + other => { + error!( + log, + "Migrator failed to load state"; + "state_root" => ?finalized_state_root, + "error" => ?other + ); + return; + } + }; let old_finalized_checkpoint = match Self::prune_abandoned_forks( db.clone(), @@ -231,13 +231,22 @@ impl, Cold: ItemStore> BackgroundMigrator>, log: Logger, - ) -> ( - mpsc::Sender>, - thread::JoinHandle<()>, - ) { + ) -> (mpsc::Sender, thread::JoinHandle<()>) { let (tx, rx) = mpsc::channel(); let thread = thread::spawn(move || { while let Ok(notif) = rx.recv() { + // Read the rest of the messages in the channel, ultimately choosing the `notif` + // with the highest finalized epoch. + let notif = rx + .try_iter() + .fold(notif, |best, other: MigrationNotification| { + if other.finalized_checkpoint.epoch > best.finalized_checkpoint.epoch { + other + } else { + best + } + }); + Self::run_migration(db.clone(), notif, &log); } }); diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index a22c7b085f..45c16cc2fc 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -13,7 +13,8 @@ use max_cover::maximum_cover; use parking_lot::RwLock; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ - get_slashable_indices, verify_attestation_for_block_inclusion, verify_exit, VerifySignatures, + get_slashable_indices_modular, verify_attestation_for_block_inclusion, verify_exit, + VerifySignatures, }; use state_processing::SigVerifiedOp; use std::collections::{hash_map, HashMap, HashSet}; @@ -235,31 +236,40 @@ impl OperationPool { (proposer_slashings, attester_slashings) } - /// Prune proposer slashings for all slashed or withdrawn validators. - pub fn prune_proposer_slashings(&self, finalized_state: &BeaconState) { + /// Prune proposer slashings for validators which are exited in the finalized epoch. + pub fn prune_proposer_slashings(&self, head_state: &BeaconState) { prune_validator_hash_map( &mut self.proposer_slashings.write(), - |validator| { - validator.slashed || validator.is_withdrawable_at(finalized_state.current_epoch()) - }, - finalized_state, + |validator| validator.exit_epoch <= head_state.finalized_checkpoint.epoch, + head_state, ); } /// Prune attester slashings for all slashed or withdrawn validators, or attestations on another /// fork. - pub fn prune_attester_slashings(&self, finalized_state: &BeaconState, head_fork: Fork) { + pub fn prune_attester_slashings(&self, head_state: &BeaconState) { self.attester_slashings .write() .retain(|(slashing, fork_version)| { - // Any slashings for forks older than the finalized state's previous fork can be - // discarded. We allow the head_fork's current version too in case a fork has - // occurred between the finalized state and the head. - let fork_ok = *fork_version == finalized_state.fork.previous_version - || *fork_version == finalized_state.fork.current_version - || *fork_version == head_fork.current_version; + let previous_fork_is_finalized = + head_state.finalized_checkpoint.epoch >= head_state.fork.epoch; + // Prune any slashings which don't match the current fork version, or the previous + // fork version if it is not finalized yet. + let fork_ok = (fork_version == &head_state.fork.current_version) + || (fork_version == &head_state.fork.previous_version + && !previous_fork_is_finalized); // Slashings that don't slash any validators can also be dropped. - let slashing_ok = get_slashable_indices(finalized_state, slashing).is_ok(); + let slashing_ok = + get_slashable_indices_modular(head_state, slashing, |_, validator| { + // Declare that a validator is still slashable if they have not exited prior + // to the finalized epoch. + // + // We cannot check the `slashed` field since the `head` is not finalized and + // a fork could un-slash someone. + validator.exit_epoch > head_state.finalized_checkpoint.epoch + }) + .map_or(false, |indices| !indices.is_empty()); + fork_ok && slashing_ok }); } @@ -295,27 +305,26 @@ impl OperationPool { ) } - /// Prune if validator has already exited at the last finalized state. - pub fn prune_voluntary_exits(&self, finalized_state: &BeaconState, spec: &ChainSpec) { + /// Prune if validator has already exited at or before the finalized checkpoint of the head. + pub fn prune_voluntary_exits(&self, head_state: &BeaconState) { prune_validator_hash_map( &mut self.voluntary_exits.write(), - |validator| validator.exit_epoch != spec.far_future_epoch, - finalized_state, + // This condition is slightly too loose, since there will be some finalized exits that + // are missed here. + // + // We choose simplicity over the gain of pruning more exits since they are small and + // should not be seen frequently. + |validator| validator.exit_epoch <= head_state.finalized_checkpoint.epoch, + head_state, ); } - /// Prune all types of transactions given the latest finalized state and head fork. - pub fn prune_all( - &self, - finalized_state: &BeaconState, - current_epoch: Epoch, - head_fork: Fork, - spec: &ChainSpec, - ) { + /// Prune all types of transactions given the latest head state and head fork. + pub fn prune_all(&self, head_state: &BeaconState, current_epoch: Epoch) { self.prune_attestations(current_epoch); - self.prune_proposer_slashings(finalized_state); - self.prune_attester_slashings(finalized_state, head_fork); - self.prune_voluntary_exits(finalized_state, spec); + self.prune_proposer_slashings(head_state); + self.prune_attester_slashings(head_state); + self.prune_voluntary_exits(head_state); } /// Total number of voluntary exits in the pool. @@ -392,12 +401,12 @@ where fn prune_validator_hash_map( map: &mut HashMap, prune_if: F, - finalized_state: &BeaconState, + head_state: &BeaconState, ) where F: Fn(&Validator) -> bool, { map.retain(|&validator_index, _| { - finalized_state + head_state .validators .get(validator_index as usize) .map_or(true, |validator| !prune_if(validator)) @@ -1012,7 +1021,7 @@ mod release_tests { let slashing = ctxt.attester_slashing(&[1, 3, 5, 7, 9]); op_pool .insert_attester_slashing(slashing.clone().validate(state, spec).unwrap(), state.fork); - op_pool.prune_attester_slashings(state, state.fork); + op_pool.prune_attester_slashings(state); assert_eq!(op_pool.get_slashings(state, spec).1, vec![slashing]); }