diff --git a/Cargo.lock b/Cargo.lock index e22d17718b..0da633bf39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -569,6 +569,7 @@ version = "0.2.0" dependencies = [ "bitvec 0.20.4", "bls", + "crossbeam-channel", "derivative", "environment", "eth1", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 551d32b3d0..c22fda3b0f 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -64,6 +64,7 @@ hex = "0.4.2" exit-future = "0.2.0" unused_port = {path = "../../common/unused_port"} oneshot_broadcast = { path = "../../common/oneshot_broadcast" } +crossbeam-channel = "0.5.5" [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 0ba01b39c3..c2720352f2 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use slog::{debug, error, info, trace, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::mem; -use std::sync::{mpsc, Arc}; +use std::sync::Arc; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::{migrate_database, HotColdDBError}; @@ -25,6 +25,7 @@ const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800; const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200; /// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`. const COMPACTION_FINALITY_DISTANCE: u64 = 1024; +const BLOCKS_PER_RECONSTRUCTION: usize = 8192 * 4; /// Default number of epochs to wait between finalization migrations. pub const DEFAULT_EPOCHS_PER_RUN: u64 = 4; @@ -33,10 +34,14 @@ pub const DEFAULT_EPOCHS_PER_RUN: u64 = 4; /// to the cold database. pub struct BackgroundMigrator, Cold: ItemStore> { db: Arc>, - #[allow(clippy::type_complexity)] - tx_thread: Option, thread::JoinHandle<()>)>>, /// Record of when the last migration ran, for enforcing `epochs_per_run`. prev_migration: Arc>, + tx_thread: Option< + Mutex<( + crossbeam_channel::Sender, + thread::JoinHandle<()>, + )>, + >, /// Genesis block root, for persisting the `PersistedBeaconChain`. genesis_block_root: Hash256, log: Logger, @@ -112,11 +117,13 @@ pub enum PruningError { } /// Message sent to the migration thread containing the information it needs to run. +#[derive(Debug)] pub enum Notification { Finalization(FinalizationNotification), Reconstruction, } +#[derive(Clone, Debug)] pub struct FinalizationNotification { finalized_state_root: BeaconStateHash, finalized_checkpoint: Checkpoint, @@ -203,7 +210,7 @@ impl, Cold: ItemStore> BackgroundMigrator>, log: &Logger) { - if let Err(e) = db.reconstruct_historic_states() { + if let Err(e) = db.reconstruct_historic_states(None) { error!( log, "State reconstruction failed"; @@ -359,39 +366,83 @@ impl, Cold: ItemStore> BackgroundMigrator>, prev_migration: Arc>, log: Logger, - ) -> (mpsc::Sender, thread::JoinHandle<()>) { - let (tx, rx) = mpsc::channel(); + ) -> ( + crossbeam_channel::Sender, + thread::JoinHandle<()>, + ) { + let (tx, rx) = crossbeam_channel::unbounded(); + let tx_thread = tx.clone(); let thread = thread::spawn(move || { - while let Ok(notif) = rx.recv() { - // Read the rest of the messages in the channel, preferring any reconstruction - // notification, or the finalization notification with the greatest finalized epoch. - let notif = - rx.try_iter() - .fold(notif, |best, other: Notification| match (&best, &other) { - (Notification::Reconstruction, _) - | (_, Notification::Reconstruction) => Notification::Reconstruction, - ( - Notification::Finalization(fin1), - Notification::Finalization(fin2), - ) => { - if fin2.finalized_checkpoint.epoch > fin1.finalized_checkpoint.epoch - { - other - } else { - best - } - } - }); + let mut sel = crossbeam_channel::Select::new(); + sel.recv(&rx); + + loop { + // Block until sth is in queue + let _queue_size = sel.ready(); + let queue: Vec = rx.try_iter().collect(); + debug!( + log, + "New worker thread poll"; + "queue" => ?queue + ); + + // Find a reconstruction notification and best finalization notification. + let reconstruction_notif = queue + .iter() + .find(|n| matches!(n, Notification::Reconstruction)); + let migrate_notif = queue + .iter() + .filter_map(|n| match n { + // should not be present anymore + Notification::Reconstruction => None, + Notification::Finalization(f) => Some(f), + }) + .max_by_key(|f| f.finalized_checkpoint.epoch); + + // Do a bit of state reconstruction first if required. + if let Some(_) = reconstruction_notif { + let timer = std::time::Instant::now(); + + match db.reconstruct_historic_states(Some(BLOCKS_PER_RECONSTRUCTION)) { + Err(Error::StateReconstructionDidNotComplete) => { + info!( + log, + "Finished reconstruction batch"; + "batch_time_ms" => timer.elapsed().as_millis() + ); + // Handle send error + let _ = tx_thread.send(Notification::Reconstruction); + } + Err(e) => { + error!( + log, + "State reconstruction failed"; + "error" => ?e, + ); + } + Ok(()) => { + info!( + log, + "Finished state reconstruction"; + "batch_time_ms" => timer.elapsed().as_millis() + ); + } + } + } + + // Do the finalization migration. + if let Some(notif) = migrate_notif { + let timer = std::time::Instant::now(); - // Do not run too frequently. - if let Some(epoch) = notif.epoch() { let mut prev_migration = prev_migration.lock(); + // Do not run too frequently. + let epoch = notif.finalized_checkpoint.epoch; if let Some(prev_epoch) = prev_migration.epoch { if epoch < prev_epoch + prev_migration.epochs_per_run { debug!( log, - "Database consolidation deferred"; + "Finalization migration deferred"; "last_finalized_epoch" => prev_epoch, "new_finalized_epoch" => epoch, "epochs_per_run" => prev_migration.epochs_per_run, @@ -404,11 +455,14 @@ impl, Cold: ItemStore> BackgroundMigrator Self::run_reconstruction(db.clone(), &log), - Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log), + Self::run_migration(db.clone(), notif.to_owned(), &log); + + info!( + log, + "Finished finalization migration"; + "running_time_ms" => timer.elapsed().as_millis() + ); } } }); @@ -534,6 +588,7 @@ impl, Cold: ItemStore> BackgroundMigrator, E: EthSpec, T: Decode + Encode>( for chunk_index in range { let key = &chunk_key(chunk_index)[..]; - let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing { chunk_index })?; + let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing { + column, + chunk_index, + })?; result.push(chunk); } @@ -675,6 +678,7 @@ pub enum ChunkError { actual: usize, }, Missing { + column: DBColumn, chunk_index: usize, }, MissingGenesisValue, diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index f9ae8eb929..58809abfaa 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -16,7 +16,10 @@ where Hot: KeyValueStore + ItemStore, Cold: KeyValueStore + ItemStore, { - pub fn reconstruct_historic_states(self: &Arc) -> Result<(), Error> { + pub fn reconstruct_historic_states( + self: &Arc, + num_blocks: Option, + ) -> Result<(), Error> { let mut anchor = if let Some(anchor) = self.get_anchor_info() { anchor } else { @@ -48,12 +51,15 @@ where // Use a dummy root, as we never read the block for the upper limit state. let upper_limit_block_root = Hash256::repeat_byte(0xff); - let block_root_iter = self.forwards_block_roots_iterator( - lower_limit_slot, - upper_limit_state, - upper_limit_block_root, - &self.spec, - )?; + // If `num_blocks` is not specified iterate all blocks. + let block_root_iter = self + .forwards_block_roots_iterator( + lower_limit_slot, + upper_limit_state, + upper_limit_block_root, + &self.spec, + )? + .take(num_blocks.unwrap_or(usize::MAX)); // The state to be advanced. let mut state = self