Work in progress block separation

This commit is contained in:
Michael Sproul
2022-09-16 17:32:22 +10:00
parent 7d3948c8fe
commit 2bd784ef68
21 changed files with 204 additions and 29 deletions

42
Cargo.lock generated
View File

@@ -733,6 +733,9 @@ name = "cc"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
@@ -3023,6 +3026,15 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d"
[[package]]
name = "jobserver"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.59"
@@ -6338,6 +6350,7 @@ dependencies = [
"strum",
"tempfile",
"types",
"zstd",
]
[[package]]
@@ -7919,3 +7932,32 @@ dependencies = [
"thiserror",
"time 0.1.44",
]
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "5.0.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
dependencies = [
"cc",
"libc",
]

View File

@@ -693,7 +693,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let root = self.block_root_at_slot(request_slot, skips)?;
if let Some(block_root) = root {
Ok(self.store.get_blinded_block(&block_root)?)
Ok(self
.store
.get_blinded_block(&block_root, Some(request_slot))?)
} else {
Ok(None)
}
@@ -919,7 +921,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
// Load block from database, returning immediately if we have the full block w payload
// stored.
let blinded_block = match self.store.try_get_full_block(block_root)? {
let blinded_block = match self.store.try_get_full_block(block_root, None)? {
Some(DatabaseBlock::Full(block)) => return Ok(Some(block)),
Some(DatabaseBlock::Blinded(block)) => block,
None => return Ok(None),
@@ -975,7 +977,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, Error> {
Ok(self.store.get_blinded_block(block_root)?)
Ok(self.store.get_blinded_block(block_root, None)?)
}
/// Returns the state at the given root, if any.
@@ -4629,7 +4631,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let beacon_block = self
.store
.get_blinded_block(&beacon_block_root)?
.get_blinded_block(&beacon_block_root, None)?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing block {}", beacon_block_root))
})?;

View File

@@ -322,7 +322,7 @@ where
metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES);
let justified_block = self
.store
.get_blinded_block(&self.justified_checkpoint.root)
.get_blinded_block(&self.justified_checkpoint.root, None)
.map_err(Error::FailedToReadBlock)?
.ok_or(Error::MissingBlock(self.justified_checkpoint.root))?
.deconstruct()

View File

@@ -256,7 +256,7 @@ where
.ok_or("Fork choice not found in store")?;
let genesis_block = store
.get_blinded_block(&chain.genesis_block_root)
.get_blinded_block(&chain.genesis_block_root, Some(Slot::new(0)))
.map_err(|e| descriptive_db_error("genesis block", &e))?
.ok_or("Genesis block not found in store")?;
let genesis_state = store
@@ -618,7 +618,7 @@ where
// Try to decode the head block according to the current fork, if that fails, try
// to backtrack to before the most recent fork.
let (head_block_root, head_block, head_reverted) =
match store.get_full_block(&initial_head_block_root) {
match store.get_full_block(&initial_head_block_root, None) {
Ok(Some(block)) => (initial_head_block_root, block, false),
Ok(None) => return Err("Head block not found in store".into()),
Err(StoreError::SszDecodeError(_)) => {

View File

@@ -269,7 +269,7 @@ impl<T: BeaconChainTypes> CanonicalHead<T> {
let fork_choice_view = fork_choice.cached_fork_choice_view();
let beacon_block_root = fork_choice_view.head_block_root;
let beacon_block = store
.get_full_block(&beacon_block_root)?
.get_full_block(&beacon_block_root, None)?
.ok_or(Error::MissingBeaconBlock(beacon_block_root))?;
let beacon_state_root = beacon_block.state_root();
let beacon_state = store
@@ -639,7 +639,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.unwrap_or_else(|| {
let beacon_block = self
.store
.get_full_block(&new_view.head_block_root)?
.get_full_block(&new_view.head_block_root, None)?
.ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?;
let beacon_state_root = beacon_block.state_root();

View File

@@ -107,7 +107,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
let finalized_checkpoint = head_state.finalized_checkpoint();
let finalized_block_root = finalized_checkpoint.root;
let finalized_block = store
.get_full_block(&finalized_block_root)
.get_full_block(&finalized_block_root, None)
.map_err(|e| format!("Error loading finalized block: {:?}", e))?
.ok_or_else(|| {
format!(

View File

@@ -93,7 +93,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut cold_batch = Vec::with_capacity(blocks.len());
let mut hot_batch = Vec::with_capacity(blocks.len());
for block in blocks_to_import.iter().rev() {
// Check chain integrity.
@@ -109,7 +108,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Store block in the hot database without payload.
self.store
.blinded_block_as_kv_store_ops(&block_root, block, &mut hot_batch);
.blinded_block_as_cold_kv_store_ops(&block_root, block, &mut cold_batch);
// Store block roots, including at all skip slots in the freezer DB.
for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() {
@@ -177,10 +176,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
drop(verify_timer);
drop(sig_timer);
// Write the I/O batches to disk, writing the blocks themselves first, as it's better
// for the hot DB to contain extra blocks than for the cold DB to point to blocks that
// do not exist.
self.store.hot_db.do_atomically(hot_batch)?;
// Write the I/O batch to disk.
self.store.cold_db.do_atomically(cold_batch)?;
// Update the anchor.

View File

@@ -424,7 +424,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// so delete it from the head tracker but leave it and its states in the database
// This is suboptimal as it wastes disk space, but it's difficult to fix. A re-sync
// can be used to reclaim the space.
let head_state_root = match store.get_blinded_block(&head_hash) {
let head_state_root = match store.get_blinded_block(&head_hash, Some(head_slot)) {
Ok(Some(block)) => block.state_root(),
Ok(None) => {
return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into())

View File

@@ -71,7 +71,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
// 2. Check on disk.
if self.store.get_blinded_block(&block_root)?.is_some() {
if self.store.get_blinded_block(&block_root, None)?.is_some() {
cache.block_roots.put(block_root, ());
return Ok(true);
}

View File

@@ -39,7 +39,7 @@ pub fn upgrade_to_v12<T: BeaconChainTypes>(
.unrealized_justified_checkpoint
.root;
let justified_block = db
.get_blinded_block(&justified_block_root)?
.get_blinded_block(&justified_block_root, None)?
.ok_or_else(|| {
Error::SchemaMigrationError(format!(
"unrealized justified block missing for migration: {justified_block_root:?}",

View File

@@ -91,7 +91,7 @@ pub fn upgrade_to_v9<T: BeaconChainTypes>(
Ok(None) => return Err(Error::BlockNotFound(block_root)),
// There was an error reading a pre-v9 block. Try reading it as a post-v9 block.
Err(_) => {
if db.try_get_full_block(&block_root)?.is_some() {
if db.try_get_full_block(&block_root, None)?.is_some() {
// The block is present as a post-v9 block, assume that it was already
// correctly migrated.
continue;

View File

@@ -26,3 +26,4 @@ lru = "0.7.1"
sloggers = { version = "2.1.1", features = ["json"] }
directory = { path = "../../common/directory" }
strum = { version = "0.24.0", features = ["derive"] }
zstd = "0.11.0"

View File

@@ -21,12 +21,16 @@ pub struct StoreConfig {
pub compact_on_init: bool,
/// Whether to compact the database during database pruning.
pub compact_on_prune: bool,
/// Whether to store finalized blocks in the freezer database.
pub separate_blocks: bool,
}
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct OnDiskStoreConfig {
pub slots_per_restore_point: u64,
// FIXME(sproul): schema migration
pub separate_blocks: bool,
}
#[derive(Debug, Clone)]
@@ -43,6 +47,7 @@ impl Default for StoreConfig {
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
compact_on_init: false,
compact_on_prune: true,
separate_blocks: true,
}
}
}
@@ -51,6 +56,7 @@ impl StoreConfig {
pub fn as_disk_config(&self) -> OnDiskStoreConfig {
OnDiskStoreConfig {
slots_per_restore_point: self.slots_per_restore_point,
separate_blocks: self.separate_blocks,
}
}

View File

@@ -41,6 +41,7 @@ pub enum Error {
computed: Hash256,
},
BlockReplayError(BlockReplayError),
Compression(std::io::Error),
AddPayloadLogicError,
ResyncRequiredForExecutionPayloadSeparation,
SlotClockUnavailableForMigration,

View File

@@ -6,7 +6,10 @@ use crate::config::{
PREV_DEFAULT_SLOTS_PER_RESTORE_POINT,
};
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::impls::{
beacon_state::{get_full_state, store_full_state},
frozen_block_slot::FrozenBlockSlot,
};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB;
@@ -33,11 +36,13 @@ use state_processing::{
};
use std::cmp::min;
use std::convert::TryInto;
use std::io::{Read, Write};
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use types::*;
use zstd::{Decoder, Encoder};
/// On-disk database that stores finalized states efficiently.
///
@@ -92,6 +97,7 @@ pub enum HotColdDBError {
MissingExecutionPayload(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo,
MissingFrozenBlockSlot(Hash256),
HotStateSummaryError(BeaconStateError),
RestorePointDecodeError(ssz::DecodeError),
BlockReplayBeaconError(BeaconStateError),
@@ -318,6 +324,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn try_get_full_block(
&self,
block_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<DatabaseBlock<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
@@ -328,7 +335,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
// Load the blinded block.
let blinded_block = match self.get_blinded_block(block_root)? {
let blinded_block = match self.get_blinded_block(block_root, slot)? {
Some(block) => block,
None => return Ok(None),
};
@@ -360,8 +367,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_full_block(
&self,
block_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<SignedBeaconBlock<E>>, Error> {
match self.try_get_full_block(block_root)? {
match self.try_get_full_block(block_root, slot)? {
Some(DatabaseBlock::Full(block)) => Ok(Some(block)),
Some(DatabaseBlock::Blinded(block)) => Err(
HotColdDBError::MissingFullBlockExecutionPayloadPruned(*block_root, block.slot())
@@ -399,12 +407,98 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_blinded_block(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<E, BlindedPayload<E>>>, Error> {
slot: Option<Slot>,
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
if let Some(slot) = slot {
if slot < self.get_split_slot() {
// To the freezer DB.
self.get_cold_blinded_block_by_slot(slot)
} else {
self.get_hot_blinded_block(block_root)
}
} else {
match self.get_hot_blinded_block(block_root)? {
Some(block) => Ok(Some(block)),
None => self.get_cold_blinded_block_by_root(block_root),
}
}
}
pub fn get_hot_blinded_block(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
self.get_block_with(block_root, |bytes| {
SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec)
})
}
pub fn get_cold_blinded_block_by_root(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
// Load slot.
if let Some(FrozenBlockSlot(block_slot)) = self.cold_db.get(block_root)? {
self.get_cold_blinded_block_by_slot(block_slot)
} else {
Ok(None)
}
}
pub fn get_cold_blinded_block_by_slot(
&self,
slot: Slot,
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
let bytes = if let Some(bytes) = self.cold_db.get_bytes(
DBColumn::BeaconBlockFrozen.into(),
&slot.as_u64().to_be_bytes(),
)? {
bytes
} else {
return Ok(None);
};
// FIXME(sproul): dodgy compression factor estimation
let mut ssz_bytes = Vec::with_capacity(2 * bytes.len());
let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?;
decoder
.read_to_end(&mut ssz_bytes)
.map_err(Error::Compression)?;
Ok(Some(SignedBeaconBlock::from_ssz_bytes(
&ssz_bytes, &self.spec,
)?))
}
pub fn blinded_block_as_cold_kv_store_ops(
&self,
block_root: &Hash256,
block: &SignedBlindedBeaconBlock<E>,
kv_store_ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
// Write the block root to slot mapping.
let slot = block.slot();
kv_store_ops.push(FrozenBlockSlot(slot).as_kv_store_op(*block_root));
// Write the block keyed by slot.
let db_key = get_key_for_col(
DBColumn::BeaconBlockFrozen.into(),
&slot.as_u64().to_be_bytes(),
);
// FIXME(sproul): fix compression estimate and level
let compression_level = 3;
let ssz_bytes = block.as_ssz_bytes();
let mut compressed_value = Vec::with_capacity(ssz_bytes.len() / 2);
let mut encoder =
Encoder::new(&mut compressed_value, compression_level).map_err(Error::Compression)?;
encoder.write_all(&ssz_bytes).map_err(Error::Compression)?;
encoder.finish().map_err(Error::Compression)?;
kv_store_ops.push(KeyValueStoreOp::PutKeyValue(db_key, compressed_value));
Ok(())
}
/// Fetch a block from the store, ignoring which fork variant it *should* be for.
pub fn get_block_any_variant<Payload: ExecPayload<E>>(
&self,
@@ -1455,6 +1549,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
}
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
let mut cold_db_block_ops: Vec<KeyValueStoreOps> = vec![];
// 1. Copy all of the states between the head and the split slot, from the hot DB
// to the cold DB.

View File

@@ -1,2 +1,3 @@
pub mod beacon_state;
pub mod execution_payload;
pub mod frozen_block_slot;

View File

@@ -0,0 +1,19 @@
use crate::{DBColumn, Error, StoreItem};
use ssz::{Decode, Encode};
use types::Slot;
pub struct FrozenBlockSlot(pub Slot);
impl StoreItem for FrozenBlockSlot {
fn db_column() -> DBColumn {
DBColumn::BeaconBlock
}
fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(FrozenBlockSlot(Slot::from_ssz_bytes(bytes)?))
}
}

View File

@@ -189,7 +189,7 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> RootsIterator<'a, T,
block_hash: Hash256,
) -> Result<Self, Error> {
let block = store
.get_blinded_block(&block_hash)?
.get_blinded_block(&block_hash, None)?
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?;
let state = store
.get_state(&block.state_root(), Some(block.slot()))?
@@ -286,7 +286,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
let block = if self.decode_any_variant {
self.store.get_block_any_variant(&block_root)
} else {
self.store.get_blinded_block(&block_root)
self.store.get_blinded_block(&block_root, None)
}?
.ok_or(Error::BlockNotFound(block_root))?;
self.next_block_root = block.message().parent_root();
@@ -329,7 +329,8 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> BlockIterator<'a, T,
fn do_next(&mut self) -> Result<Option<SignedBeaconBlock<T, BlindedPayload<T>>>, Error> {
if let Some(result) = self.roots.next() {
let (root, _slot) = result?;
self.roots.inner.store.get_blinded_block(&root)
// Don't use slot hint here as it could be a skipped slot.
self.roots.inner.store.get_blinded_block(&root, None)
} else {
Ok(None)
}

View File

@@ -169,8 +169,19 @@ pub enum DBColumn {
/// For data related to the database itself.
#[strum(serialize = "bma")]
BeaconMeta,
/// Data related to blocks.
///
/// - Key: `Hash256` block root.
/// - Value in hot DB: SSZ-encoded blinded block.
/// - Value in cold DB: 8-byte slot of block.
#[strum(serialize = "blk")]
BeaconBlock,
/// Frozen beacon blocks.
///
/// - Key: 8-byte slot.
/// - Value: ZSTD-compressed SSZ-encoded blinded block.
#[strum(serialize = "bbf")]
BeaconBlockFrozen,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
#[strum(serialize = "ste")]
BeaconState,

View File

@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{Checkpoint, Hash256, Slot};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(12);
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(8000);
// All the keys that get stored under the `BeaconMeta` column.
//

View File

@@ -76,7 +76,7 @@ where
None
} else {
Some(
self.get_blinded_block(&block_root)?
self.get_blinded_block(&block_root, Some(slot))?
.ok_or(Error::BlockNotFound(block_root))?,
)
};