Merge remote-tracking branch 'origin/unstable' into tree-states

This commit is contained in:
Michael Sproul
2022-05-24 10:01:05 +10:00
237 changed files with 8506 additions and 3598 deletions

View File

@@ -4,7 +4,8 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{EthSpec, MinimalEthSpec};
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
@@ -15,6 +16,8 @@ const EST_COMPRESSION_FACTOR: usize = 2;
pub struct StoreConfig {
/// Number of slots to wait between storing restore points in the freezer database.
pub slots_per_restore_point: u64,
/// Flag indicating whether the `slots_per_restore_point` was set explicitly by the user.
pub slots_per_restore_point_set_explicitly: bool,
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: usize,
/// Maximum number of states to store in the in-memory state cache.
@@ -44,6 +47,7 @@ impl Default for StoreConfig {
Self {
// Safe default for tests, shouldn't ever be read by a CLI node.
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
slots_per_restore_point_set_explicitly: false,
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
compression_level: DEFAULT_COMPRESSION_LEVEL,

View File

@@ -54,6 +54,10 @@ pub enum Error {
state_root: Hash256,
slot: Slot,
},
AddPayloadLogicError,
ResyncRequiredForExecutionPayloadSeparation,
SlotClockUnavailableForMigration,
V9MigrationFailure(Hash256),
}
pub trait HandleUnavailable<T> {

View File

@@ -1,7 +1,10 @@
use crate::chunked_vector::{
store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots,
};
use crate::config::{OnDiskStoreConfig, StoreConfig};
use crate::config::{
OnDiskStoreConfig, StoreConfig, DEFAULT_SLOTS_PER_RESTORE_POINT,
PREV_DEFAULT_SLOTS_PER_RESTORE_POINT,
};
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
@@ -16,8 +19,8 @@ use crate::metadata::{
use crate::metrics;
use crate::state_cache::StateCache;
use crate::{
get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem,
StoreOp,
get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
PartialBeaconState, StoreItem, StoreOp,
};
use leveldb::iterator::LevelDBIterator;
use lru::LruCache;
@@ -92,6 +95,8 @@ pub enum HotColdDBError {
MissingPrevState(Hash256),
MissingSplitState(Hash256, Slot),
MissingStateDiff(Hash256),
MissingExecutionPayload(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo,
HotStateSummaryError(BeaconStateError),
RestorePointDecodeError(ssz::DecodeError),
@@ -159,7 +164,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
config.verify_compression_level()?;
let db = Arc::new(HotColdDB {
let mut db = HotColdDB {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None),
cold_db: LevelDB::open(cold_path)?,
@@ -170,10 +175,46 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
spec,
log,
_phantom: PhantomData,
});
};
// Allow the slots-per-restore-point value to stay at the previous default if the config
// uses the new default. Don't error on a failed read because the config itself may need
// migrating.
if let Ok(Some(disk_config)) = db.load_config() {
if !db.config.slots_per_restore_point_set_explicitly
&& disk_config.slots_per_restore_point == PREV_DEFAULT_SLOTS_PER_RESTORE_POINT
&& db.config.slots_per_restore_point == DEFAULT_SLOTS_PER_RESTORE_POINT
{
debug!(
db.log,
"Ignoring slots-per-restore-point config in favour of on-disk value";
"config" => db.config.slots_per_restore_point,
"on_disk" => disk_config.slots_per_restore_point,
);
// Mutate the in-memory config so that it's compatible.
db.config.slots_per_restore_point = PREV_DEFAULT_SLOTS_PER_RESTORE_POINT;
}
}
// Load the previous split slot from the database (if any). This ensures we can
// stop and restart correctly. This needs to occur *before* running any migrations
// because some migrations load states and depend on the split.
if let Some(split) = db.load_split()? {
*db.split.write() = split;
*db.anchor_info.write() = db.load_anchor_info()?;
info!(
db.log,
"Hot-Cold DB initialized";
"split_slot" => split.slot,
"split_state" => ?split.state_root
);
}
// Ensure that the schema version of the on-disk database matches the software.
// If the version is mismatched, an automatic migration will be attempted.
let db = Arc::new(db);
if let Some(schema_version) = db.load_schema_version()? {
debug!(
db.log,
@@ -192,21 +233,6 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
}
db.store_config()?;
// Load the previous split slot from the database (if any). This ensures we can
// stop and restart correctly.
if let Some(split) = db.load_split()? {
*db.split.write() = split;
*db.anchor_info.write() = db.load_anchor_info()?;
info!(
db.log,
"Hot-Cold DB initialized";
"version" => CURRENT_SCHEMA_VERSION.as_u64(),
"split_slot" => split.slot,
"split_state" => format!("{:?}", split.state_root)
);
}
// Run a garbage collection pass.
db.remove_garbage()?;
@@ -266,53 +292,150 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
block: SignedBeaconBlock<E>,
) -> Result<(), Error> {
// Store on disk.
let op = self.block_as_kv_store_op(block_root, &block);
self.hot_db.do_atomically(vec![op])?;
let mut ops = Vec::with_capacity(2);
let block = self.block_as_kv_store_ops(block_root, block, &mut ops)?;
self.hot_db.do_atomically(ops)?;
// Update cache.
self.block_cache.lock().put(*block_root, block);
Ok(())
}
/// Prepare a signed beacon block for storage in the database.
pub fn block_as_kv_store_op(
///
/// Return the original block for re-use after storage. It's passed by value so it can be
/// cracked open and have its payload extracted.
pub fn block_as_kv_store_ops(
&self,
key: &Hash256,
block: &SignedBeaconBlock<E>,
) -> KeyValueStoreOp {
// FIXME(altair): re-add block write/overhead metrics, or remove them
let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_bytes());
KeyValueStoreOp::PutKeyValue(db_key, block.as_ssz_bytes())
block: SignedBeaconBlock<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<SignedBeaconBlock<E>, Error> {
// Split block into blinded block and execution payload.
let (blinded_block, payload) = block.into();
// Store blinded block.
self.blinded_block_as_kv_store_ops(key, &blinded_block, ops);
// Store execution payload if present.
if let Some(ref execution_payload) = payload {
ops.push(execution_payload.as_kv_store_op(*key)?);
}
// Re-construct block. This should always succeed.
blinded_block
.try_into_full_block(payload)
.ok_or(Error::AddPayloadLogicError)
}
/// Fetch a block from the store.
pub fn get_block(&self, block_root: &Hash256) -> Result<Option<SignedBeaconBlock<E>>, Error> {
/// Prepare a signed beacon block for storage in the datbase *without* its payload.
pub fn blinded_block_as_kv_store_ops(
&self,
key: &Hash256,
blinded_block: &SignedBeaconBlock<E, BlindedPayload<E>>,
ops: &mut Vec<KeyValueStoreOp>,
) {
let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_bytes());
ops.push(KeyValueStoreOp::PutKeyValue(
db_key,
blinded_block.as_ssz_bytes(),
));
}
pub fn try_get_full_block(
&self,
block_root: &Hash256,
) -> Result<Option<DatabaseBlock<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
// Check the cache.
if let Some(block) = self.block_cache.lock().get(block_root) {
metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT);
return Ok(Some(block.clone()));
return Ok(Some(DatabaseBlock::Full(block.clone())));
}
let block = self.get_block_with(block_root, |bytes| {
SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec)
})?;
// Load the blinded block.
let blinded_block = match self.get_blinded_block(block_root)? {
Some(block) => block,
None => return Ok(None),
};
// Add to cache.
if let Some(ref block) = block {
self.block_cache.lock().put(*block_root, block.clone());
}
// If the block is after the split point then we should have the full execution payload
// stored in the database. Otherwise, just return the blinded block.
// Hold the split lock so that it can't change.
let split = self.split.read_recursive();
Ok(block)
let block = if blinded_block.message().execution_payload().is_err()
|| blinded_block.slot() >= split.slot
{
// Re-constructing the full block should always succeed here.
let full_block = self.make_full_block(block_root, blinded_block)?;
// Add to cache.
self.block_cache.lock().put(*block_root, full_block.clone());
DatabaseBlock::Full(full_block)
} else {
DatabaseBlock::Blinded(blinded_block)
};
drop(split);
Ok(Some(block))
}
/// Fetch a block from the store, ignoring which fork variant it *should* be for.
pub fn get_block_any_variant(
/// Fetch a full block with execution payload from the store.
pub fn get_full_block(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<E>>, Error> {
match self.try_get_full_block(block_root)? {
Some(DatabaseBlock::Full(block)) => Ok(Some(block)),
Some(DatabaseBlock::Blinded(block)) => Err(
HotColdDBError::MissingFullBlockExecutionPayloadPruned(*block_root, block.slot())
.into(),
),
None => Ok(None),
}
}
/// Get a schema V8 or earlier full block by reading it and its payload from disk.
pub fn get_full_block_prior_to_v9(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<E>>, Error> {
self.get_block_with(block_root, |bytes| {
SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec)
})
}
/// Convert a blinded block into a full block by loading its execution payload if necessary.
pub fn make_full_block(
&self,
block_root: &Hash256,
blinded_block: SignedBeaconBlock<E, BlindedPayload<E>>,
) -> Result<SignedBeaconBlock<E>, Error> {
if blinded_block.message().execution_payload().is_ok() {
let execution_payload = self.get_execution_payload(block_root)?;
blinded_block.try_into_full_block(Some(execution_payload))
} else {
blinded_block.try_into_full_block(None)
}
.ok_or(Error::AddPayloadLogicError)
}
pub fn get_blinded_block(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<E, BlindedPayload<E>>>, Error> {
self.get_block_with(block_root, |bytes| {
SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec)
})
}
/// Fetch a block from the store, ignoring which fork variant it *should* be for.
pub fn get_block_any_variant<Payload: ExecPayload<E>>(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<E, Payload>>, Error> {
self.get_block_with(block_root, SignedBeaconBlock::any_from_ssz_bytes)
}
@@ -320,11 +443,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
///
/// This is useful for e.g. ignoring the slot-indicated fork to forcefully load a block as if it
/// were for a different fork.
pub fn get_block_with(
pub fn get_block_with<Payload: ExecPayload<E>>(
&self,
block_root: &Hash256,
decoder: impl FnOnce(&[u8]) -> Result<SignedBeaconBlock<E>, ssz::DecodeError>,
) -> Result<Option<SignedBeaconBlock<E>>, Error> {
decoder: impl FnOnce(&[u8]) -> Result<SignedBeaconBlock<E, Payload>, ssz::DecodeError>,
) -> Result<Option<SignedBeaconBlock<E, Payload>>, Error> {
self.hot_db
.get_bytes(DBColumn::BeaconBlock.into(), block_root.as_bytes())?
.map(|block_bytes| decoder(&block_bytes))
@@ -332,6 +455,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map_err(|e| e.into())
}
/// Load the execution payload for a block from disk.
pub fn get_execution_payload(
&self,
block_root: &Hash256,
) -> Result<ExecutionPayload<E>, Error> {
self.get_item(block_root)?
.ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into())
}
/// Determine whether a block exists in the database.
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
self.hot_db
@@ -342,7 +474,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
self.block_cache.lock().pop(block_root);
self.hot_db
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?;
self.hot_db
.key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())
}
pub fn put_state_summary(
@@ -499,7 +633,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in batch {
match op {
StoreOp::PutBlock(block_root, block) => {
key_value_batch.push(self.block_as_kv_store_op(&block_root, &block));
self.block_as_kv_store_ops(&block_root, *block, &mut key_value_batch)?;
}
StoreOp::PutState(state_root, state) => {
@@ -541,12 +675,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
StoreOp::KeyValueOp(kv_op) => key_value_batch.push(kv_op),
StoreOp::DeleteExecutionPayload(block_root) => {
let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}
}
}
Ok(key_value_batch)
}
pub fn do_atomically(&self, batch: Vec<StoreOp<E>>) -> Result<(), Error> {
// Update the block cache whilst holding a lock, to ensure that the cache updates atomically
// with the database.
let mut block_cache = self.block_cache.lock();
for op in &batch {
@@ -571,8 +711,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
StoreOp::KeyValueOp(_) => (),
StoreOp::DeleteExecutionPayload(_) => (),
}
}
self.hot_db
.do_atomically(self.convert_to_kv_batch(batch)?)?;
drop(block_cache);
@@ -713,7 +856,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}) = self.load_hot_state_summary(state_root)?
{
// Load the latest block, and use it to confirm the validity of this state.
let latest_block = if let Some(block) = self.get_block(&latest_block_root)? {
let latest_block = if let Some(block) = self.get_blinded_block(&latest_block_root)? {
block
} else {
// Dangling state, will be deleted fully once finalization advances past it.
@@ -989,34 +1132,33 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
start_slot: Slot,
end_slot: Slot,
end_block_hash: Hash256,
) -> Result<Vec<SignedBeaconBlock<E>>, Error> {
let mut blocks: Vec<SignedBeaconBlock<E>> =
ParentRootBlockIterator::new(self, end_block_hash)
.map(|result| result.map(|(_, block)| block))
// Include the block at the end slot (if any), it needs to be
// replayed in order to construct the canonical state at `end_slot`.
.filter(|result| {
result
.as_ref()
.map_or(true, |block| block.slot() <= end_slot)
})
// Include the block at the start slot (if any). Whilst it doesn't need to be
// applied to the state, it contains a potentially useful state root.
//
// Return `true` on an `Err` so that the `collect` fails, unless the error is a
// `BlockNotFound` error and some blocks are intentionally missing from the DB.
// This complexity is unfortunately necessary to avoid loading the parent of the
// oldest known block -- we can't know that we have all the required blocks until we
// load a block with slot less than the start slot, which is impossible if there are
// no blocks with slot less than the start slot.
.take_while(|result| match result {
Ok(block) => block.slot() >= start_slot,
Err(Error::BlockNotFound(_)) => {
self.get_oldest_block_slot() == self.spec.genesis_slot
}
Err(_) => true,
})
.collect::<Result<_, _>>()?;
) -> Result<Vec<SignedBeaconBlock<E, BlindedPayload<E>>>, Error> {
let mut blocks = ParentRootBlockIterator::new(self, end_block_hash)
.map(|result| result.map(|(_, block)| block))
// Include the block at the end slot (if any), it needs to be
// replayed in order to construct the canonical state at `end_slot`.
.filter(|result| {
result
.as_ref()
.map_or(true, |block| block.slot() <= end_slot)
})
// Include the block at the start slot (if any). Whilst it doesn't need to be
// applied to the state, it contains a potentially useful state root.
//
// Return `true` on an `Err` so that the `collect` fails, unless the error is a
// `BlockNotFound` error and some blocks are intentionally missing from the DB.
// This complexity is unfortunately necessary to avoid loading the parent of the
// oldest known block -- we can't know that we have all the required blocks until we
// load a block with slot less than the start slot, which is impossible if there are
// no blocks with slot less than the start slot.
.take_while(|result| match result {
Ok(block) => block.slot() >= start_slot,
Err(Error::BlockNotFound(_)) => {
self.get_oldest_block_slot() == self.spec.genesis_slot
}
Err(_) => true,
})
.collect::<Result<Vec<_>, _>>()?;
blocks.reverse();
Ok(blocks)
}
@@ -1028,7 +1170,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn replay_blocks(
&self,
state: BeaconState<E>,
blocks: Vec<SignedBeaconBlock<E>>,
blocks: Vec<SignedBeaconBlock<E, BlindedPayload<E>>>,
target_slot: Slot,
state_root_iter: impl Iterator<Item = Result<(Hash256, Slot), Error>>,
) -> Result<BeaconState<E>, Error> {
@@ -1048,6 +1190,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
}
/// Get a reference to the `ChainSpec` used by the database.
pub fn get_chain_spec(&self) -> &ChainSpec {
&self.spec
}
/// Fetch a copy of the current split slot from memory.
pub fn get_split_slot(&self) -> Slot {
self.split.read_recursive().slot
@@ -1227,6 +1374,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map_or(self.spec.genesis_slot, |anchor| anchor.oldest_block_slot)
}
/// Return the in-memory configuration used by the database.
pub fn get_config(&self) -> &StoreConfig {
&self.config
}
/// Load previously-stored config from disk.
fn load_config(&self) -> Result<Option<OnDiskStoreConfig>, Error> {
self.hot_db.get(&CONFIG_KEY)

View File

@@ -1 +1,2 @@
pub mod beacon_state;
pub mod execution_payload;

View File

@@ -0,0 +1,17 @@
use crate::{DBColumn, Error, StoreItem};
use ssz::{Decode, Encode};
use types::{EthSpec, ExecutionPayload};
impl<E: EthSpec> StoreItem for ExecutionPayload<E> {
fn db_column() -> DBColumn {
DBColumn::ExecPayload
}
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

View File

@@ -3,7 +3,8 @@ use crate::{Error, HotColdDB, ItemStore};
use std::borrow::Cow;
use std::marker::PhantomData;
use types::{
typenum::Unsigned, BeaconState, BeaconStateError, EthSpec, Hash256, SignedBeaconBlock, Slot,
typenum::Unsigned, BeaconState, BeaconStateError, BlindedPayload, EthSpec, Hash256,
SignedBeaconBlock, Slot,
};
/// Implemented for types that have ancestors (e.g., blocks, states) that may be iterated over.
@@ -188,7 +189,7 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> RootsIterator<'a, T,
block_hash: Hash256,
) -> Result<Self, Error> {
let block = store
.get_block(&block_hash)?
.get_blinded_block(&block_hash)?
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?;
let state = store
.get_state(&block.state_root(), Some(block.slot()))?
@@ -272,7 +273,10 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
}
}
fn do_next(&mut self) -> Result<Option<(Hash256, SignedBeaconBlock<E>)>, Error> {
#[allow(clippy::type_complexity)]
fn do_next(
&mut self,
) -> Result<Option<(Hash256, SignedBeaconBlock<E, BlindedPayload<E>>)>, Error> {
// Stop once we reach the zero parent, otherwise we'll keep returning the genesis
// block forever.
if self.next_block_root.is_zero() {
@@ -282,7 +286,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
let block = if self.decode_any_variant {
self.store.get_block_any_variant(&block_root)
} else {
self.store.get_block(&block_root)
self.store.get_blinded_block(&block_root)
}?
.ok_or(Error::BlockNotFound(block_root))?;
self.next_block_root = block.message().parent_root();
@@ -294,7 +298,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for ParentRootBlockIterator<'a, E, Hot, Cold>
{
type Item = Result<(Hash256, SignedBeaconBlock<E>), Error>;
type Item = Result<(Hash256, SignedBeaconBlock<E, BlindedPayload<E>>), Error>;
fn next(&mut self) -> Option<Self::Item> {
self.do_next().transpose()
@@ -322,10 +326,10 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> BlockIterator<'a, T,
}
}
fn do_next(&mut self) -> Result<Option<SignedBeaconBlock<T>>, Error> {
fn do_next(&mut self) -> Result<Option<SignedBeaconBlock<T, BlindedPayload<T>>>, Error> {
if let Some(result) = self.roots.next() {
let (root, _slot) = result?;
self.roots.inner.store.get_block(&root)
self.roots.inner.store.get_blinded_block(&root)
} else {
Ok(None)
}
@@ -335,7 +339,7 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> BlockIterator<'a, T,
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Iterator
for BlockIterator<'a, T, Hot, Cold>
{
type Item = Result<SignedBeaconBlock<T>, Error>;
type Item = Result<SignedBeaconBlock<T, BlindedPayload<T>>, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.do_next().transpose()

View File

@@ -178,10 +178,7 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
}
/// Iterate through all keys and values in a particular column.
fn iter_column<'a>(
&'a self,
column: DBColumn,
) -> Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a> {
fn iter_column(&self, column: DBColumn) -> ColumnIter {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
@@ -201,6 +198,28 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
}),
)
}
/// Iterate through all keys and values in a particular column.
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
let iter = self.db.keys_iter(self.read_options());
iter.seek(&start_key);
Box::new(
iter.take_while(move |key| key.matches_column(column))
.map(move |bytes_key| {
let key =
bytes_key
.remove_column(column)
.ok_or(HotColdDBError::IterationError {
unexpected_key: bytes_key,
})?;
Ok(key)
}),
)
}
}
impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}

View File

@@ -45,6 +45,7 @@ use strum::{EnumString, IntoStaticStr};
pub use types::*;
pub type ColumnIter<'a> = Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;
pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// Retrieve some bytes in `column` with `key`.
@@ -79,11 +80,17 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// Compact the database, freeing space used by deleted items.
fn compact(&self) -> Result<(), Error>;
/// Iterate through all values in a particular column.
/// Iterate through all keys and values in a particular column.
fn iter_column(&self, _column: DBColumn) -> ColumnIter {
// Default impl for non LevelDB databases
Box::new(std::iter::empty())
}
/// Iterate through all keys in a particular column.
fn iter_column_keys(&self, _column: DBColumn) -> ColumnKeyIter {
// Default impl for non LevelDB databases
Box::new(std::iter::empty())
}
}
pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
@@ -153,6 +160,7 @@ pub enum StoreOp<'a, E: EthSpec> {
DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),
DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256),
KeyValueOp(KeyValueStoreOp),
}
@@ -177,6 +185,9 @@ pub enum DBColumn {
/// and then made non-temporary by the deletion of their state root from this column.
#[strum(serialize = "bst")]
BeaconStateTemporary,
/// Execution payloads for blocks more recent than the finalized checkpoint.
#[strum(serialize = "exp")]
ExecPayload,
/// For persisting in-memory state to the database.
#[strum(serialize = "bch")]
BeaconChain,
@@ -203,6 +214,12 @@ pub enum DBColumn {
DhtEnrs,
}
/// A block from the database, which might have an execution payload or not.
pub enum DatabaseBlock<E: EthSpec> {
Full(SignedBeaconBlock<E>),
Blinded(SignedBeaconBlock<E, BlindedPayload<E>>),
}
impl DBColumn {
pub fn as_str(self) -> &'static str {
self.into()

View File

@@ -77,7 +77,7 @@ where
None
} else {
Some(
self.get_block(&block_root)?
self.get_blinded_block(&block_root)?
.ok_or(Error::BlockNotFound(block_root))?,
)
};