diff --git a/Cargo.lock b/Cargo.lock index 486a7f1d77..915c657a9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,7 +86,7 @@ checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" dependencies = [ "cfg-if", "cipher", - "cpufeatures 0.2.2", + "cpufeatures 0.2.5", "ctr", "opaque-debug", ] @@ -772,7 +772,7 @@ checksum = "01b72a433d0cf2aef113ba70f62634c56fddb0f244e6377185c56a7cadbd8f91" dependencies = [ "cfg-if", "cipher", - "cpufeatures 0.2.2", + "cpufeatures 0.2.5", "zeroize", ] @@ -968,6 +968,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "cpufeatures" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66c99696f6c9dd7f35d486b9d04d7e6e202aa3e8c40d553f2fdf5e7e0c6a71ef" +dependencies = [ + "libc", +] + [[package]] name = "cpufeatures" version = "0.2.5" @@ -1765,7 +1774,7 @@ dependencies = [ name = "eth2_hashing" version = "0.3.0" dependencies = [ - "cpufeatures 0.2.2", + "cpufeatures 0.2.5", "lazy_static", "ring", "rustc-hex", @@ -3088,9 +3097,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.57" +version = "0.3.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "671a26f820db17c2a2750743f1dd03bafd15b98c9f30c7c2628c024c05d73397" +checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47" dependencies = [ "wasm-bindgen", ] @@ -3373,8 +3382,8 @@ dependencies = [ "multiaddr 0.14.0", "multihash 0.16.2", "multistream-select 0.11.0", - "parking_lot 0.12.1", - "pin-project 1.0.11", + "parking_lot 0.12.0", + "pin-project 1.0.10", "prost 0.11.0", "prost-build 0.11.1", "rand 0.8.5", @@ -3595,7 +3604,7 @@ checksum = "b74ec8dc042b583f0b2b93d52917f3b374c1e4b1cfa79ee74c7672c41257694c" dependencies = [ "futures", "libp2p-core 0.36.0", - "parking_lot 0.12.1", + "parking_lot 0.12.0", "thiserror", "yamux", ] @@ -4824,7 +4833,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede" dependencies = [ - "cpufeatures 0.2.2", + "cpufeatures 0.2.5", "opaque-debug", "universal-hash", ] @@ -4836,7 +4845,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1" dependencies = [ "cfg-if", - "cpufeatures 0.2.2", + "cpufeatures 0.2.5", "opaque-debug", "universal-hash", ] @@ -4957,7 +4966,7 @@ checksum = "3c473049631c233933d6286c88bbb7be30e62ec534cf99a9ae0079211f7fa603" dependencies = [ "dtoa", "itoa 1.0.2", - "parking_lot 0.12.1", + "parking_lot 0.12.0", "prometheus-client-derive-text-encode", ] @@ -5944,7 +5953,7 @@ checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" dependencies = [ "block-buffer 0.9.0", "cfg-if", - "cpufeatures 0.2.2", + "cpufeatures 0.2.5", "digest 0.9.0", "opaque-debug", ] @@ -5956,7 +5965,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" dependencies = [ "cfg-if", - "cpufeatures 0.2.2", + "cpufeatures 0.2.5", "digest 0.10.3", ] @@ -5968,7 +5977,7 @@ checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" dependencies = [ "block-buffer 0.9.0", "cfg-if", - "cpufeatures 0.2.2", + "cpufeatures 0.2.5", "digest 0.9.0", "opaque-debug", ] @@ -5980,7 +5989,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676" dependencies = [ "cfg-if", - "cpufeatures 0.2.2", + "cpufeatures 0.2.5", "digest 0.10.3", ] @@ -6359,6 +6368,12 @@ dependencies = [ "syn", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "state_processing" version = "0.2.0" @@ -7577,9 +7592,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.80" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27370197c907c55e3f1a9fbe26f44e937fe6451368324e009cba39e139dc08ad" +checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -7587,13 +7602,13 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.80" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53e04185bfa3a779273da532f5025e33398409573f348985af9a1cbf3774d3f4" +checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142" dependencies = [ "bumpalo", - "lazy_static", "log", + "once_cell", "proc-macro2", "quote", "syn", @@ -7614,9 +7629,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.80" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17cae7ff784d7e83a2fe7611cfe766ecf034111b49deb850a3dc7699c08251f5" +checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -7624,9 +7639,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.80" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99ec0dc7a4756fffc231aab1b9f2f578d23cd391390ab27f952ae0c9b3ece20b" +checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" dependencies = [ "proc-macro2", "quote", @@ -7637,9 +7652,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.80" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d554b7f530dee5964d9a9468d95c1f8b8acae4f282807e7d27d4b03099a46744" +checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" [[package]] name = "wasm-bindgen-test" @@ -8037,18 +8052,18 @@ dependencies = [ [[package]] name = "zstd" -version = "0.10.2+zstd.1.5.2" +version = "0.11.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4a6bd64f22b5e3e94b4e238669ff9f10815c27a5180108b849d24174a83847" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "4.1.6+zstd.1.5.2" +version = "5.0.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94b61c51bb270702d6167b8ce67340d2754b088d0c091b06e593aa772c3ee9bb" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" dependencies = [ "libc", "zstd-sys", @@ -8056,9 +8071,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.6.3+zstd.1.5.2" +version = "2.0.1+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8" +checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" dependencies = [ "cc", "libc", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 54f912dfab..52f0440f0a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -680,7 +680,9 @@ impl BeaconChain { let root = self.block_root_at_slot(request_slot, skips)?; if let Some(block_root) = root { - Ok(self.store.get_blinded_block(&block_root)?) + Ok(self + .store + .get_blinded_block(&block_root, Some(request_slot))?) } else { Ok(None) } @@ -906,7 +908,7 @@ impl BeaconChain { ) -> Result>, Error> { // Load block from database, returning immediately if we have the full block w payload // stored. - let blinded_block = match self.store.try_get_full_block(block_root)? { + let blinded_block = match self.store.try_get_full_block(block_root, None)? { Some(DatabaseBlock::Full(block)) => return Ok(Some(block)), Some(DatabaseBlock::Blinded(block)) => block, None => return Ok(None), @@ -962,7 +964,7 @@ impl BeaconChain { &self, block_root: &Hash256, ) -> Result>, Error> { - Ok(self.store.get_blinded_block(block_root)?) + Ok(self.store.get_blinded_block(block_root, None)?) } /// Returns the state at the given root, if any. @@ -4582,7 +4584,7 @@ impl BeaconChain { let beacon_block = self .store - .get_blinded_block(&beacon_block_root)? + .get_blinded_block(&beacon_block_root, None)? .ok_or_else(|| { Error::DBInconsistent(format!("Missing block {}", beacon_block_root)) })?; diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index a21db19725..dc7bcacd17 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -323,7 +323,7 @@ where metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES); let justified_block = self .store - .get_blinded_block(&self.justified_checkpoint.root) + .get_blinded_block(&self.justified_checkpoint.root, None) .map_err(Error::FailedToReadBlock)? .ok_or(Error::MissingBlock(self.justified_checkpoint.root))? .deconstruct() diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index dfb43f11c6..fa91e9ab87 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -252,7 +252,7 @@ where .ok_or("Fork choice not found in store")?; let genesis_block = store - .get_blinded_block(&chain.genesis_block_root) + .get_blinded_block(&chain.genesis_block_root, Some(Slot::new(0))) .map_err(|e| descriptive_db_error("genesis block", &e))? .ok_or("Genesis block not found in store")?; let genesis_state = store @@ -303,6 +303,7 @@ where .ok_or("set_genesis_state requires a store")?; let beacon_block = genesis_block(&mut beacon_state, &self.spec)?; + let blinded_block = beacon_block.clone_as_blinded(); beacon_state .build_all_caches(&self.spec) @@ -319,12 +320,12 @@ where .put_state(&beacon_state_root, &beacon_state) .map_err(|e| format!("Failed to store genesis state: {:?}", e))?; store - .put_block(&beacon_block_root, beacon_block.clone()) + .put_cold_blinded_block(&beacon_block_root, &blinded_block) .map_err(|e| format!("Failed to store genesis block: {:?}", e))?; // Store the genesis block under the `ZERO_HASH` key. store - .put_block(&Hash256::zero(), beacon_block.clone()) + .put_cold_blinded_block(&Hash256::zero(), &blinded_block) .map_err(|e| { format!( "Failed to store genesis block under 0x00..00 alias: {:?}", @@ -634,7 +635,7 @@ where // Try to decode the head block according to the current fork, if that fails, try // to backtrack to before the most recent fork. let (head_block_root, head_block, head_reverted) = - match store.get_full_block(&initial_head_block_root) { + match store.get_full_block(&initial_head_block_root, None) { Ok(Some(block)) => (initial_head_block_root, block, false), Ok(None) => return Err("Head block not found in store".into()), Err(StoreError::SszDecodeError(_)) => { @@ -1060,7 +1061,7 @@ mod test { assert_eq!( chain .store - .get_blinded_block(&Hash256::zero()) + .get_blinded_block(&Hash256::zero(), None) .expect("should read db") .expect("should find genesis block"), block.clone_as_blinded(), diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 6dc8285034..47baeba7d9 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -267,7 +267,7 @@ impl CanonicalHead { let fork_choice_view = fork_choice.cached_fork_choice_view(); let beacon_block_root = fork_choice_view.head_block_root; let beacon_block = store - .get_full_block(&beacon_block_root)? + .get_full_block(&beacon_block_root, None)? .ok_or(Error::MissingBeaconBlock(beacon_block_root))?; let beacon_state_root = beacon_block.state_root(); let beacon_state = store @@ -623,7 +623,7 @@ impl BeaconChain { let mut new_snapshot = { let beacon_block = self .store - .get_full_block(&new_view.head_block_root)? + .get_full_block(&new_view.head_block_root, None)? .ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?; // FIXME(sproul): use advanced state? diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index 3d48dfd8f6..8942915c9d 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -108,7 +108,7 @@ pub fn reset_fork_choice_to_finalization, Cold: It let finalized_checkpoint = head_state.finalized_checkpoint(); let finalized_block_root = finalized_checkpoint.root; let finalized_block = store - .get_full_block(&finalized_block_root) + .get_full_block(&finalized_block_root, None) .map_err(|e| format!("Error loading finalized block: {:?}", e))? .ok_or_else(|| { format!( diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index cc45a6bb9a..7d7fc323e9 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -93,7 +93,6 @@ impl BeaconChain { ChunkWriter::::new(&self.store.cold_db, prev_block_slot.as_usize())?; let mut cold_batch = Vec::with_capacity(blocks.len()); - let mut hot_batch = Vec::with_capacity(blocks.len()); for block in blocks_to_import.iter().rev() { // Check chain integrity. @@ -109,7 +108,7 @@ impl BeaconChain { // Store block in the hot database without payload. self.store - .blinded_block_as_kv_store_ops(&block_root, block, &mut hot_batch); + .blinded_block_as_cold_kv_store_ops(&block_root, block, &mut cold_batch)?; // Store block roots, including at all skip slots in the freezer DB. for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() { @@ -177,10 +176,7 @@ impl BeaconChain { drop(verify_timer); drop(sig_timer); - // Write the I/O batches to disk, writing the blocks themselves first, as it's better - // for the hot DB to contain extra blocks than for the cold DB to point to blocks that - // do not exist. - self.store.hot_db.do_atomically(hot_batch)?; + // Write the I/O batch to disk. self.store.cold_db.do_atomically(cold_batch)?; // Update the anchor. diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 84ef0acdee..ced469eda2 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -502,7 +502,7 @@ impl, Cold: ItemStore> BackgroundMigrator block.state_root(), Ok(None) => { return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into()) diff --git a/beacon_node/beacon_chain/src/pre_finalization_cache.rs b/beacon_node/beacon_chain/src/pre_finalization_cache.rs index 112394bb18..ca957af213 100644 --- a/beacon_node/beacon_chain/src/pre_finalization_cache.rs +++ b/beacon_node/beacon_chain/src/pre_finalization_cache.rs @@ -71,7 +71,7 @@ impl BeaconChain { } // 2. Check on disk. - if self.store.get_blinded_block(&block_root)?.is_some() { + if self.store.get_blinded_block(&block_root, None)?.is_some() { cache.block_roots.put(block_root, ()); return Ok(true); } diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs index 17ca06dd46..4db49cf2df 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs @@ -39,7 +39,7 @@ pub fn upgrade_to_v12( .unrealized_justified_checkpoint .root; let justified_block = db - .get_blinded_block(&justified_block_root)? + .get_blinded_block(&justified_block_root, None)? .ok_or_else(|| { Error::SchemaMigrationError(format!( "unrealized justified block missing for migration: {justified_block_root:?}", diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs index e494bb8764..2519a132e8 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs @@ -70,7 +70,7 @@ pub fn upgrade_to_v20( .zip(ssz_head_tracker.slots) { let block = db - .get_blinded_block(&head_block_root)? + .get_blinded_block(&head_block_root, Some(head_state_slot))? .ok_or(Error::BlockNotFound(head_block_root))?; let head_state_root = block.state_root(); diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs index e2c48d5c89..2135e5f689 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs @@ -91,7 +91,7 @@ pub fn upgrade_to_v9( Ok(None) => return Err(Error::BlockNotFound(block_root)), // There was an error reading a pre-v9 block. Try reading it as a post-v9 block. Err(_) => { - if db.try_get_full_block(&block_root)?.is_some() { + if db.try_get_full_block(&block_root, None)?.is_some() { // The block is present as a post-v9 block, assume that it was already // correctly migrated. continue; diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index a01847a0e1..031ad8ea8a 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -234,7 +234,7 @@ mod test { .clone(); let committee_b = state.committee_cache(RelativeEpoch::Next).unwrap().clone(); assert!(committee_a != committee_b); - (Arc::new(committee_a), Arc::new(committee_b)) + (committee_a, committee_b) } /// Builds a deterministic but incoherent shuffling ID from a `u64`. diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index bc6b556f2a..9307c0d1a4 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -566,7 +566,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("4") .takes_value(true) ) - /* * Misc. */ diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index dbafb61bd0..5b1a6b7846 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -28,6 +28,6 @@ sloggers = { version = "2.1.1", features = ["json"] } directory = { path = "../../common/directory" } tree_hash = "0.4.0" take-until = "0.1.0" -zstd = "0.10.0" +zstd = "0.11.0" strum = { version = "0.24.0", features = ["derive"] } bls = { path = "../../crypto/bls" } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 1faec2e60b..5d257ead9d 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -30,12 +30,16 @@ pub struct StoreConfig { pub compact_on_prune: bool, /// Whether to prune payloads on initialization and finalization. pub prune_payloads: bool, + /// Whether to store finalized blocks in the freezer database. + pub separate_blocks: bool, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct OnDiskStoreConfig { pub slots_per_restore_point: u64, + // FIXME(sproul): schema migration + pub separate_blocks: bool, } #[derive(Debug, Clone)] @@ -56,6 +60,7 @@ impl Default for StoreConfig { compact_on_init: false, compact_on_prune: true, prune_payloads: true, + separate_blocks: true, } } } @@ -64,6 +69,7 @@ impl StoreConfig { pub fn as_disk_config(&self) -> OnDiskStoreConfig { OnDiskStoreConfig { slots_per_restore_point: self.slots_per_restore_point, + separate_blocks: self.separate_blocks, } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 5c86280cc5..9545eec373 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -7,7 +7,10 @@ use crate::config::{ }; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; use crate::hot_state_iter::HotStateRootIter; -use crate::impls::beacon_state::{get_full_state, store_full_state}; +use crate::impls::{ + beacon_state::{get_full_state, store_full_state}, + frozen_block_slot::FrozenBlockSlot, +}; use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::leveldb_store::{BytesKey, LevelDB}; use crate::memory_store::MemoryStore; @@ -36,14 +39,14 @@ use state_processing::{ block_replayer::PreSlotHook, BlockProcessingError, BlockReplayer, SlotProcessingError, }; use std::cmp::min; -use std::io::Read; +use std::io::{Read, Write}; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use std::time::Duration; use types::*; use types::{beacon_state::BeaconStateDiff, EthSpec}; -use zstd::Decoder; +use zstd::{Decoder, Encoder}; pub const MAX_PARENT_STATES_TO_CACHE: u64 = 32; @@ -106,6 +109,7 @@ pub enum HotColdDBError { MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, + MissingFrozenBlockSlot(Hash256), HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), BlockReplayBeaconError(BeaconStateError), @@ -358,6 +362,7 @@ impl, Cold: ItemStore> HotColdDB pub fn try_get_full_block( &self, block_root: &Hash256, + slot: Option, ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); @@ -368,7 +373,7 @@ impl, Cold: ItemStore> HotColdDB } // Load the blinded block. - let blinded_block = match self.get_blinded_block(block_root)? { + let blinded_block = match self.get_blinded_block(block_root, slot)? { Some(block) => block, None => return Ok(None), }; @@ -414,8 +419,9 @@ impl, Cold: ItemStore> HotColdDB pub fn get_full_block( &self, block_root: &Hash256, + slot: Option, ) -> Result>, Error> { - match self.try_get_full_block(block_root)? { + match self.try_get_full_block(block_root, slot)? { Some(DatabaseBlock::Full(block)) => Ok(Some(block)), Some(DatabaseBlock::Blinded(block)) => Err( HotColdDBError::MissingFullBlockExecutionPayloadPruned(*block_root, block.slot()) @@ -455,12 +461,108 @@ impl, Cold: ItemStore> HotColdDB pub fn get_blinded_block( &self, block_root: &Hash256, - ) -> Result>>, Error> { + slot: Option, + ) -> Result>, Error> { + if let Some(slot) = slot { + if slot < self.get_split_slot() || slot == 0 { + // To the freezer DB. + self.get_cold_blinded_block_by_slot(slot) + } else { + self.get_hot_blinded_block(block_root) + } + } else { + match self.get_hot_blinded_block(block_root)? { + Some(block) => Ok(Some(block)), + None => self.get_cold_blinded_block_by_root(block_root), + } + } + } + + pub fn get_hot_blinded_block( + &self, + block_root: &Hash256, + ) -> Result>, Error> { self.get_block_with(block_root, |bytes| { SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec) }) } + pub fn get_cold_blinded_block_by_root( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + // Load slot. + if let Some(FrozenBlockSlot(block_slot)) = self.cold_db.get(block_root)? { + self.get_cold_blinded_block_by_slot(block_slot) + } else { + Ok(None) + } + } + + pub fn get_cold_blinded_block_by_slot( + &self, + slot: Slot, + ) -> Result>, Error> { + let bytes = if let Some(bytes) = self.cold_db.get_bytes( + DBColumn::BeaconBlockFrozen.into(), + &slot.as_u64().to_be_bytes(), + )? { + bytes + } else { + return Ok(None); + }; + + // FIXME(sproul): dodgy compression factor estimation + let mut ssz_bytes = Vec::with_capacity(2 * bytes.len()); + let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?; + decoder + .read_to_end(&mut ssz_bytes) + .map_err(Error::Compression)?; + Ok(Some(SignedBeaconBlock::from_ssz_bytes( + &ssz_bytes, &self.spec, + )?)) + } + + pub fn put_cold_blinded_block( + &self, + block_root: &Hash256, + block: &SignedBlindedBeaconBlock, + ) -> Result<(), Error> { + let mut ops = Vec::with_capacity(2); + self.blinded_block_as_cold_kv_store_ops(block_root, block, &mut ops)?; + self.cold_db.do_atomically(ops) + } + + pub fn blinded_block_as_cold_kv_store_ops( + &self, + block_root: &Hash256, + block: &SignedBlindedBeaconBlock, + kv_store_ops: &mut Vec, + ) -> Result<(), Error> { + // Write the block root to slot mapping. + let slot = block.slot(); + kv_store_ops.push(FrozenBlockSlot(slot).as_kv_store_op(*block_root)?); + + // Write the block keyed by slot. + let db_key = get_key_for_col( + DBColumn::BeaconBlockFrozen.into(), + &slot.as_u64().to_be_bytes(), + ); + + // FIXME(sproul): fix compression estimate and level + let compression_level = 3; + let ssz_bytes = block.as_ssz_bytes(); + let mut compressed_value = Vec::with_capacity(ssz_bytes.len() / 2); + let mut encoder = + Encoder::new(&mut compressed_value, compression_level).map_err(Error::Compression)?; + encoder.write_all(&ssz_bytes).map_err(Error::Compression)?; + encoder.finish().map_err(Error::Compression)?; + + kv_store_ops.push(KeyValueStoreOp::PutKeyValue(db_key, compressed_value)); + + Ok(()) + } + /// Fetch a block from the store, ignoring which fork variant it *should* be for. pub fn get_block_any_variant>( &self, @@ -898,19 +1000,20 @@ impl, Cold: ItemStore> HotColdDB }) = self.load_hot_state_summary(state_root)? { // Load the latest block, and use it to confirm the validity of this state. - let latest_block = if let Some(block) = self.get_blinded_block(&latest_block_root)? { - block - } else { - // Dangling state, will be deleted fully once finalization advances past it. - debug!( - self.log, - "Ignoring state load for dangling state"; - "state_root" => ?state_root, - "slot" => slot, - "latest_block_root" => ?latest_block_root, - ); - return Ok(None); - }; + let latest_block = + if let Some(block) = self.get_blinded_block(&latest_block_root, None)? { + block + } else { + // Dangling state, will be deleted fully once finalization advances past it. + debug!( + self.log, + "Ignoring state load for dangling state"; + "state_root" => ?state_root, + "slot" => slot, + "latest_block_root" => ?latest_block_root, + ); + return Ok(None); + }; // On a fork boundary slot load a full state from disk. if self.spec.fork_activated_at_slot::(slot).is_some() { @@ -1847,6 +1950,7 @@ pub fn migrate_database, Cold: ItemStore>( // Copy all of the states between the new finalized state and the split slot, from the hot DB to // the cold DB. let mut hot_db_ops: Vec> = Vec::new(); + let mut cold_db_block_ops: Vec = vec![]; let state_root_iter = RootsIterator::new(&store, finalized_state); for maybe_tuple in state_root_iter.take_while(|result| match result { @@ -1888,6 +1992,16 @@ pub fn migrate_database, Cold: ItemStore>( if store.config.prune_payloads { hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); } + + // Copy the blinded block from the hot database to the freezer. + let blinded_block = store + .get_hot_blinded_block(&block_root)? + .ok_or(Error::BlockNotFound(block_root))?; + store.blinded_block_as_cold_kv_store_ops( + &block_root, + &blinded_block, + &mut cold_db_block_ops, + )?; } // Warning: Critical section. We have to take care not to put any of the two databases in an @@ -1901,6 +2015,7 @@ pub fn migrate_database, Cold: ItemStore>( // exceedingly rare event, this should be an acceptable tradeoff. // Flush to disk all the states that have just been migrated to the cold store. + store.cold_db.do_atomically(cold_db_block_ops)?; store.cold_db.sync()?; { diff --git a/beacon_node/store/src/impls.rs b/beacon_node/store/src/impls.rs index 736585a72a..b2af9a408e 100644 --- a/beacon_node/store/src/impls.rs +++ b/beacon_node/store/src/impls.rs @@ -1,2 +1,3 @@ pub mod beacon_state; pub mod execution_payload; +pub mod frozen_block_slot; diff --git a/beacon_node/store/src/impls/frozen_block_slot.rs b/beacon_node/store/src/impls/frozen_block_slot.rs new file mode 100644 index 0000000000..13dea82764 --- /dev/null +++ b/beacon_node/store/src/impls/frozen_block_slot.rs @@ -0,0 +1,19 @@ +use crate::{DBColumn, Error, StoreItem}; +use ssz::{Decode, Encode}; +use types::Slot; + +pub struct FrozenBlockSlot(pub Slot); + +impl StoreItem for FrozenBlockSlot { + fn db_column() -> DBColumn { + DBColumn::BeaconBlock + } + + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.0.as_ssz_bytes()) + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(FrozenBlockSlot(Slot::from_ssz_bytes(bytes)?)) + } +} diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 6d45b59544..dc7aa02a5a 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -189,7 +189,7 @@ impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, T, block_hash: Hash256, ) -> Result { let block = store - .get_blinded_block(&block_hash)? + .get_blinded_block(&block_hash, None)? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?; let state = store .get_state(&block.state_root(), Some(block.slot()))? @@ -286,7 +286,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> let block = if self.decode_any_variant { self.store.get_block_any_variant(&block_root) } else { - self.store.get_blinded_block(&block_root) + self.store.get_blinded_block(&block_root, None) }? .ok_or(Error::BlockNotFound(block_root))?; self.next_block_root = block.message().parent_root(); @@ -329,7 +329,8 @@ impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, T, fn do_next(&mut self) -> Result>>, Error> { if let Some(result) = self.roots.next() { let (root, _slot) = result?; - self.roots.inner.store.get_blinded_block(&root) + // Don't use slot hint here as it could be a skipped slot. + self.roots.inner.store.get_blinded_block(&root, None) } else { Ok(None) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 5e6a40135f..cec25ca62f 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -174,8 +174,19 @@ pub enum DBColumn { /// For data related to the database itself. #[strum(serialize = "bma")] BeaconMeta, + /// Data related to blocks. + /// + /// - Key: `Hash256` block root. + /// - Value in hot DB: SSZ-encoded blinded block. + /// - Value in cold DB: 8-byte slot of block. #[strum(serialize = "blk")] BeaconBlock, + /// Frozen beacon blocks. + /// + /// - Key: 8-byte slot. + /// - Value: ZSTD-compressed SSZ-encoded blinded block. + #[strum(serialize = "bbf")] + BeaconBlockFrozen, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). #[strum(serialize = "ste")] BeaconState, diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index c939fd3f51..d2cac0e4d3 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -77,7 +77,7 @@ where None } else { Some( - self.get_blinded_block(&block_root)? + self.get_blinded_block(&block_root, Some(slot))? .ok_or(Error::BlockNotFound(block_root))?, ) };