mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-15 02:42:38 +00:00
Separate execution payloads in the DB (#3157)
## Proposed Changes Reduce post-merge disk usage by not storing finalized execution payloads in Lighthouse's database. ⚠️ **This is achieved in a backwards-incompatible way for networks that have already merged** ⚠️. Kiln users and shadow fork enjoyers will be unable to downgrade after running the code from this PR. The upgrade migration may take several minutes to run, and can't be aborted after it begins. The main changes are: - New column in the database called `ExecPayload`, keyed by beacon block root. - The `BeaconBlock` column now stores blinded blocks only. - Lots of places that previously used full blocks now use blinded blocks, e.g. analytics APIs, block replay in the DB, etc. - On finalization: - `prune_abanonded_forks` deletes non-canonical payloads whilst deleting non-canonical blocks. - `migrate_db` deletes finalized canonical payloads whilst deleting finalized states. - Conversions between blinded and full blocks are implemented in a compositional way, duplicating some work from Sean's PR #3134. - The execution layer has a new `get_payload_by_block_hash` method that reconstructs a payload using the EE's `eth_getBlockByHash` call. - I've tested manually that it works on Kiln, using Geth and Nethermind. - This isn't necessarily the most efficient method, and new engine APIs are being discussed to improve this: https://github.com/ethereum/execution-apis/pull/146. - We're depending on the `ethers` master branch, due to lots of recent changes. We're also using a workaround for https://github.com/gakonst/ethers-rs/issues/1134. - Payload reconstruction is used in the HTTP API via `BeaconChain::get_block`, which is now `async`. Due to the `async` fn, the `blocking_json` wrapper has been removed. - Payload reconstruction is used in network RPC to serve blocks-by-{root,range} responses. Here the `async` adjustment is messier, although I think I've managed to come up with a reasonable compromise: the handlers take the `SendOnDrop` by value so that they can drop it on _task completion_ (after the `fn` returns). Still, this is introducing disk reads onto core executor threads, which may have a negative performance impact (thoughts appreciated). ## Additional Info - [x] For performance it would be great to remove the cloning of full blocks when converting them to blinded blocks to write to disk. I'm going to experiment with a `put_block` API that takes the block by value, breaks it into a blinded block and a payload, stores the blinded block, and then re-assembles the full block for the caller. - [x] We should measure the latency of blocks-by-root and blocks-by-range responses. - [x] We should add integration tests that stress the payload reconstruction (basic tests done, issue for more extensive tests: https://github.com/sigp/lighthouse/issues/3159) - [x] We should (manually) test the schema v9 migration from several prior versions, particularly as blocks have changed on disk and some migrations rely on being able to load blocks. Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
@@ -41,6 +41,10 @@ pub enum Error {
|
||||
computed: Hash256,
|
||||
},
|
||||
BlockReplayError(BlockReplayError),
|
||||
AddPayloadLogicError,
|
||||
ResyncRequiredForExecutionPayloadSeparation,
|
||||
SlotClockUnavailableForMigration,
|
||||
V9MigrationFailure(Hash256),
|
||||
}
|
||||
|
||||
pub trait HandleUnavailable<T> {
|
||||
|
||||
@@ -18,8 +18,8 @@ use crate::metadata::{
|
||||
};
|
||||
use crate::metrics;
|
||||
use crate::{
|
||||
get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem,
|
||||
StoreOp,
|
||||
get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
|
||||
PartialBeaconState, StoreItem, StoreOp,
|
||||
};
|
||||
use leveldb::iterator::LevelDBIterator;
|
||||
use lru::LruCache;
|
||||
@@ -89,6 +89,8 @@ pub enum HotColdDBError {
|
||||
MissingHotStateSummary(Hash256),
|
||||
MissingEpochBoundaryState(Hash256),
|
||||
MissingSplitState(Hash256, Slot),
|
||||
MissingExecutionPayload(Hash256),
|
||||
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
|
||||
MissingAnchorInfo,
|
||||
HotStateSummaryError(BeaconStateError),
|
||||
RestorePointDecodeError(ssz::DecodeError),
|
||||
@@ -185,6 +187,21 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
}
|
||||
}
|
||||
|
||||
// Load the previous split slot from the database (if any). This ensures we can
|
||||
// stop and restart correctly. This needs to occur *before* running any migrations
|
||||
// because some migrations load states and depend on the split.
|
||||
if let Some(split) = db.load_split()? {
|
||||
*db.split.write() = split;
|
||||
*db.anchor_info.write() = db.load_anchor_info()?;
|
||||
|
||||
info!(
|
||||
db.log,
|
||||
"Hot-Cold DB initialized";
|
||||
"split_slot" => split.slot,
|
||||
"split_state" => ?split.state_root
|
||||
);
|
||||
}
|
||||
|
||||
// Ensure that the schema version of the on-disk database matches the software.
|
||||
// If the version is mismatched, an automatic migration will be attempted.
|
||||
let db = Arc::new(db);
|
||||
@@ -206,20 +223,6 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
}
|
||||
db.store_config()?;
|
||||
|
||||
// Load the previous split slot from the database (if any). This ensures we can
|
||||
// stop and restart correctly.
|
||||
if let Some(split) = db.load_split()? {
|
||||
*db.split.write() = split;
|
||||
*db.anchor_info.write() = db.load_anchor_info()?;
|
||||
|
||||
info!(
|
||||
db.log,
|
||||
"Hot-Cold DB initialized";
|
||||
"split_slot" => split.slot,
|
||||
"split_state" => ?split.state_root
|
||||
);
|
||||
}
|
||||
|
||||
// Run a garbage collection pass.
|
||||
db.remove_garbage()?;
|
||||
|
||||
@@ -263,53 +266,150 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
block: SignedBeaconBlock<E>,
|
||||
) -> Result<(), Error> {
|
||||
// Store on disk.
|
||||
let op = self.block_as_kv_store_op(block_root, &block);
|
||||
self.hot_db.do_atomically(vec![op])?;
|
||||
|
||||
let mut ops = Vec::with_capacity(2);
|
||||
let block = self.block_as_kv_store_ops(block_root, block, &mut ops)?;
|
||||
self.hot_db.do_atomically(ops)?;
|
||||
// Update cache.
|
||||
self.block_cache.lock().put(*block_root, block);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prepare a signed beacon block for storage in the database.
|
||||
pub fn block_as_kv_store_op(
|
||||
///
|
||||
/// Return the original block for re-use after storage. It's passed by value so it can be
|
||||
/// cracked open and have its payload extracted.
|
||||
pub fn block_as_kv_store_ops(
|
||||
&self,
|
||||
key: &Hash256,
|
||||
block: &SignedBeaconBlock<E>,
|
||||
) -> KeyValueStoreOp {
|
||||
// FIXME(altair): re-add block write/overhead metrics, or remove them
|
||||
let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_bytes());
|
||||
KeyValueStoreOp::PutKeyValue(db_key, block.as_ssz_bytes())
|
||||
block: SignedBeaconBlock<E>,
|
||||
ops: &mut Vec<KeyValueStoreOp>,
|
||||
) -> Result<SignedBeaconBlock<E>, Error> {
|
||||
// Split block into blinded block and execution payload.
|
||||
let (blinded_block, payload) = block.into();
|
||||
|
||||
// Store blinded block.
|
||||
self.blinded_block_as_kv_store_ops(key, &blinded_block, ops);
|
||||
|
||||
// Store execution payload if present.
|
||||
if let Some(ref execution_payload) = payload {
|
||||
ops.push(execution_payload.as_kv_store_op(*key));
|
||||
}
|
||||
|
||||
// Re-construct block. This should always succeed.
|
||||
blinded_block
|
||||
.try_into_full_block(payload)
|
||||
.ok_or(Error::AddPayloadLogicError)
|
||||
}
|
||||
|
||||
/// Fetch a block from the store.
|
||||
pub fn get_block(&self, block_root: &Hash256) -> Result<Option<SignedBeaconBlock<E>>, Error> {
|
||||
/// Prepare a signed beacon block for storage in the datbase *without* its payload.
|
||||
pub fn blinded_block_as_kv_store_ops(
|
||||
&self,
|
||||
key: &Hash256,
|
||||
blinded_block: &SignedBeaconBlock<E, BlindedPayload<E>>,
|
||||
ops: &mut Vec<KeyValueStoreOp>,
|
||||
) {
|
||||
let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_bytes());
|
||||
ops.push(KeyValueStoreOp::PutKeyValue(
|
||||
db_key,
|
||||
blinded_block.as_ssz_bytes(),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn try_get_full_block(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<Option<DatabaseBlock<E>>, Error> {
|
||||
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
|
||||
|
||||
// Check the cache.
|
||||
if let Some(block) = self.block_cache.lock().get(block_root) {
|
||||
metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT);
|
||||
return Ok(Some(block.clone()));
|
||||
return Ok(Some(DatabaseBlock::Full(block.clone())));
|
||||
}
|
||||
|
||||
let block = self.get_block_with(block_root, |bytes| {
|
||||
SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec)
|
||||
})?;
|
||||
// Load the blinded block.
|
||||
let blinded_block = match self.get_blinded_block(block_root)? {
|
||||
Some(block) => block,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
// Add to cache.
|
||||
if let Some(ref block) = block {
|
||||
self.block_cache.lock().put(*block_root, block.clone());
|
||||
}
|
||||
// If the block is after the split point then we should have the full execution payload
|
||||
// stored in the database. Otherwise, just return the blinded block.
|
||||
// Hold the split lock so that it can't change.
|
||||
let split = self.split.read_recursive();
|
||||
|
||||
Ok(block)
|
||||
let block = if blinded_block.message().execution_payload().is_err()
|
||||
|| blinded_block.slot() >= split.slot
|
||||
{
|
||||
// Re-constructing the full block should always succeed here.
|
||||
let full_block = self.make_full_block(block_root, blinded_block)?;
|
||||
|
||||
// Add to cache.
|
||||
self.block_cache.lock().put(*block_root, full_block.clone());
|
||||
|
||||
DatabaseBlock::Full(full_block)
|
||||
} else {
|
||||
DatabaseBlock::Blinded(blinded_block)
|
||||
};
|
||||
drop(split);
|
||||
|
||||
Ok(Some(block))
|
||||
}
|
||||
|
||||
/// Fetch a block from the store, ignoring which fork variant it *should* be for.
|
||||
pub fn get_block_any_variant(
|
||||
/// Fetch a full block with execution payload from the store.
|
||||
pub fn get_full_block(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<Option<SignedBeaconBlock<E>>, Error> {
|
||||
match self.try_get_full_block(block_root)? {
|
||||
Some(DatabaseBlock::Full(block)) => Ok(Some(block)),
|
||||
Some(DatabaseBlock::Blinded(block)) => Err(
|
||||
HotColdDBError::MissingFullBlockExecutionPayloadPruned(*block_root, block.slot())
|
||||
.into(),
|
||||
),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a schema V8 or earlier full block by reading it and its payload from disk.
|
||||
pub fn get_full_block_prior_to_v9(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<Option<SignedBeaconBlock<E>>, Error> {
|
||||
self.get_block_with(block_root, |bytes| {
|
||||
SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec)
|
||||
})
|
||||
}
|
||||
|
||||
/// Convert a blinded block into a full block by loading its execution payload if necessary.
|
||||
pub fn make_full_block(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
blinded_block: SignedBeaconBlock<E, BlindedPayload<E>>,
|
||||
) -> Result<SignedBeaconBlock<E>, Error> {
|
||||
if blinded_block.message().execution_payload().is_ok() {
|
||||
let execution_payload = self.get_execution_payload(block_root)?;
|
||||
blinded_block.try_into_full_block(Some(execution_payload))
|
||||
} else {
|
||||
blinded_block.try_into_full_block(None)
|
||||
}
|
||||
.ok_or(Error::AddPayloadLogicError)
|
||||
}
|
||||
|
||||
pub fn get_blinded_block(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<Option<SignedBeaconBlock<E, BlindedPayload<E>>>, Error> {
|
||||
self.get_block_with(block_root, |bytes| {
|
||||
SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec)
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetch a block from the store, ignoring which fork variant it *should* be for.
|
||||
pub fn get_block_any_variant<Payload: ExecPayload<E>>(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<Option<SignedBeaconBlock<E, Payload>>, Error> {
|
||||
self.get_block_with(block_root, SignedBeaconBlock::any_from_ssz_bytes)
|
||||
}
|
||||
|
||||
@@ -317,11 +417,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
///
|
||||
/// This is useful for e.g. ignoring the slot-indicated fork to forcefully load a block as if it
|
||||
/// were for a different fork.
|
||||
pub fn get_block_with(
|
||||
pub fn get_block_with<Payload: ExecPayload<E>>(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
decoder: impl FnOnce(&[u8]) -> Result<SignedBeaconBlock<E>, ssz::DecodeError>,
|
||||
) -> Result<Option<SignedBeaconBlock<E>>, Error> {
|
||||
decoder: impl FnOnce(&[u8]) -> Result<SignedBeaconBlock<E, Payload>, ssz::DecodeError>,
|
||||
) -> Result<Option<SignedBeaconBlock<E, Payload>>, Error> {
|
||||
self.hot_db
|
||||
.get_bytes(DBColumn::BeaconBlock.into(), block_root.as_bytes())?
|
||||
.map(|block_bytes| decoder(&block_bytes))
|
||||
@@ -329,6 +429,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
|
||||
/// Load the execution payload for a block from disk.
|
||||
pub fn get_execution_payload(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<ExecutionPayload<E>, Error> {
|
||||
self.get_item(block_root)?
|
||||
.ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into())
|
||||
}
|
||||
|
||||
/// Determine whether a block exists in the database.
|
||||
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||
self.hot_db
|
||||
@@ -339,7 +448,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
|
||||
self.block_cache.lock().pop(block_root);
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())
|
||||
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?;
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
pub fn put_state_summary(
|
||||
@@ -550,24 +661,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
|
||||
/// Convert a batch of `StoreOp` to a batch of `KeyValueStoreOp`.
|
||||
pub fn convert_to_kv_batch(&self, batch: &[StoreOp<E>]) -> Result<Vec<KeyValueStoreOp>, Error> {
|
||||
pub fn convert_to_kv_batch(
|
||||
&self,
|
||||
batch: Vec<StoreOp<E>>,
|
||||
) -> Result<Vec<KeyValueStoreOp>, Error> {
|
||||
let mut key_value_batch = Vec::with_capacity(batch.len());
|
||||
for op in batch {
|
||||
match op {
|
||||
StoreOp::PutBlock(block_root, block) => {
|
||||
key_value_batch.push(self.block_as_kv_store_op(block_root, block));
|
||||
self.block_as_kv_store_ops(&block_root, *block, &mut key_value_batch)?;
|
||||
}
|
||||
|
||||
StoreOp::PutState(state_root, state) => {
|
||||
self.store_hot_state(state_root, state, &mut key_value_batch)?;
|
||||
self.store_hot_state(&state_root, state, &mut key_value_batch)?;
|
||||
}
|
||||
|
||||
StoreOp::PutStateSummary(state_root, summary) => {
|
||||
key_value_batch.push(summary.as_kv_store_op(*state_root));
|
||||
key_value_batch.push(summary.as_kv_store_op(state_root));
|
||||
}
|
||||
|
||||
StoreOp::PutStateTemporaryFlag(state_root) => {
|
||||
key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root));
|
||||
key_value_batch.push(TemporaryFlag.as_kv_store_op(state_root));
|
||||
}
|
||||
|
||||
StoreOp::DeleteStateTemporaryFlag(state_root) => {
|
||||
@@ -592,17 +706,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key));
|
||||
}
|
||||
}
|
||||
|
||||
StoreOp::DeleteExecutionPayload(block_root) => {
|
||||
let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes());
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(key_value_batch)
|
||||
}
|
||||
|
||||
pub fn do_atomically(&self, batch: Vec<StoreOp<E>>) -> Result<(), Error> {
|
||||
// Update the block cache whilst holding a lock, to ensure that the cache updates atomically
|
||||
// with the database.
|
||||
let mut guard = self.block_cache.lock();
|
||||
|
||||
self.hot_db
|
||||
.do_atomically(self.convert_to_kv_batch(&batch)?)?;
|
||||
|
||||
for op in &batch {
|
||||
match op {
|
||||
StoreOp::PutBlock(block_root, block) => {
|
||||
@@ -622,8 +740,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
|
||||
StoreOp::DeleteState(_, _) => (),
|
||||
|
||||
StoreOp::DeleteExecutionPayload(_) => (),
|
||||
}
|
||||
}
|
||||
|
||||
self.hot_db
|
||||
.do_atomically(self.convert_to_kv_batch(batch)?)?;
|
||||
drop(guard);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -887,34 +1012,33 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
start_slot: Slot,
|
||||
end_slot: Slot,
|
||||
end_block_hash: Hash256,
|
||||
) -> Result<Vec<SignedBeaconBlock<E>>, Error> {
|
||||
let mut blocks: Vec<SignedBeaconBlock<E>> =
|
||||
ParentRootBlockIterator::new(self, end_block_hash)
|
||||
.map(|result| result.map(|(_, block)| block))
|
||||
// Include the block at the end slot (if any), it needs to be
|
||||
// replayed in order to construct the canonical state at `end_slot`.
|
||||
.filter(|result| {
|
||||
result
|
||||
.as_ref()
|
||||
.map_or(true, |block| block.slot() <= end_slot)
|
||||
})
|
||||
// Include the block at the start slot (if any). Whilst it doesn't need to be
|
||||
// applied to the state, it contains a potentially useful state root.
|
||||
//
|
||||
// Return `true` on an `Err` so that the `collect` fails, unless the error is a
|
||||
// `BlockNotFound` error and some blocks are intentionally missing from the DB.
|
||||
// This complexity is unfortunately necessary to avoid loading the parent of the
|
||||
// oldest known block -- we can't know that we have all the required blocks until we
|
||||
// load a block with slot less than the start slot, which is impossible if there are
|
||||
// no blocks with slot less than the start slot.
|
||||
.take_while(|result| match result {
|
||||
Ok(block) => block.slot() >= start_slot,
|
||||
Err(Error::BlockNotFound(_)) => {
|
||||
self.get_oldest_block_slot() == self.spec.genesis_slot
|
||||
}
|
||||
Err(_) => true,
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
) -> Result<Vec<SignedBeaconBlock<E, BlindedPayload<E>>>, Error> {
|
||||
let mut blocks = ParentRootBlockIterator::new(self, end_block_hash)
|
||||
.map(|result| result.map(|(_, block)| block))
|
||||
// Include the block at the end slot (if any), it needs to be
|
||||
// replayed in order to construct the canonical state at `end_slot`.
|
||||
.filter(|result| {
|
||||
result
|
||||
.as_ref()
|
||||
.map_or(true, |block| block.slot() <= end_slot)
|
||||
})
|
||||
// Include the block at the start slot (if any). Whilst it doesn't need to be
|
||||
// applied to the state, it contains a potentially useful state root.
|
||||
//
|
||||
// Return `true` on an `Err` so that the `collect` fails, unless the error is a
|
||||
// `BlockNotFound` error and some blocks are intentionally missing from the DB.
|
||||
// This complexity is unfortunately necessary to avoid loading the parent of the
|
||||
// oldest known block -- we can't know that we have all the required blocks until we
|
||||
// load a block with slot less than the start slot, which is impossible if there are
|
||||
// no blocks with slot less than the start slot.
|
||||
.take_while(|result| match result {
|
||||
Ok(block) => block.slot() >= start_slot,
|
||||
Err(Error::BlockNotFound(_)) => {
|
||||
self.get_oldest_block_slot() == self.spec.genesis_slot
|
||||
}
|
||||
Err(_) => true,
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
blocks.reverse();
|
||||
Ok(blocks)
|
||||
}
|
||||
@@ -926,7 +1050,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
fn replay_blocks(
|
||||
&self,
|
||||
state: BeaconState<E>,
|
||||
blocks: Vec<SignedBeaconBlock<E>>,
|
||||
blocks: Vec<SignedBeaconBlock<E, BlindedPayload<E>>>,
|
||||
target_slot: Slot,
|
||||
state_root_iter: Option<impl Iterator<Item = Result<(Hash256, Slot), Error>>>,
|
||||
state_root_strategy: StateRootStrategy,
|
||||
@@ -956,6 +1080,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
})
|
||||
}
|
||||
|
||||
/// Get a reference to the `ChainSpec` used by the database.
|
||||
pub fn get_chain_spec(&self) -> &ChainSpec {
|
||||
&self.spec
|
||||
}
|
||||
|
||||
/// Fetch a copy of the current split slot from memory.
|
||||
pub fn get_split_slot(&self) -> Slot {
|
||||
self.split.read_recursive().slot
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
pub mod beacon_state;
|
||||
pub mod execution_payload;
|
||||
|
||||
17
beacon_node/store/src/impls/execution_payload.rs
Normal file
17
beacon_node/store/src/impls/execution_payload.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
use crate::{DBColumn, Error, StoreItem};
|
||||
use ssz::{Decode, Encode};
|
||||
use types::{EthSpec, ExecutionPayload};
|
||||
|
||||
impl<E: EthSpec> StoreItem for ExecutionPayload<E> {
|
||||
fn db_column() -> DBColumn {
|
||||
DBColumn::ExecPayload
|
||||
}
|
||||
|
||||
fn as_store_bytes(&self) -> Vec<u8> {
|
||||
self.as_ssz_bytes()
|
||||
}
|
||||
|
||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
||||
Ok(Self::from_ssz_bytes(bytes)?)
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,8 @@ use crate::{Error, HotColdDB, ItemStore};
|
||||
use std::borrow::Cow;
|
||||
use std::marker::PhantomData;
|
||||
use types::{
|
||||
typenum::Unsigned, BeaconState, BeaconStateError, EthSpec, Hash256, SignedBeaconBlock, Slot,
|
||||
typenum::Unsigned, BeaconState, BeaconStateError, BlindedPayload, EthSpec, Hash256,
|
||||
SignedBeaconBlock, Slot,
|
||||
};
|
||||
|
||||
/// Implemented for types that have ancestors (e.g., blocks, states) that may be iterated over.
|
||||
@@ -188,7 +189,7 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> RootsIterator<'a, T,
|
||||
block_hash: Hash256,
|
||||
) -> Result<Self, Error> {
|
||||
let block = store
|
||||
.get_block(&block_hash)?
|
||||
.get_blinded_block(&block_hash)?
|
||||
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?;
|
||||
let state = store
|
||||
.get_state(&block.state_root(), Some(block.slot()))?
|
||||
@@ -272,7 +273,10 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
}
|
||||
}
|
||||
|
||||
fn do_next(&mut self) -> Result<Option<(Hash256, SignedBeaconBlock<E>)>, Error> {
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn do_next(
|
||||
&mut self,
|
||||
) -> Result<Option<(Hash256, SignedBeaconBlock<E, BlindedPayload<E>>)>, Error> {
|
||||
// Stop once we reach the zero parent, otherwise we'll keep returning the genesis
|
||||
// block forever.
|
||||
if self.next_block_root.is_zero() {
|
||||
@@ -282,7 +286,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
let block = if self.decode_any_variant {
|
||||
self.store.get_block_any_variant(&block_root)
|
||||
} else {
|
||||
self.store.get_block(&block_root)
|
||||
self.store.get_blinded_block(&block_root)
|
||||
}?
|
||||
.ok_or(Error::BlockNotFound(block_root))?;
|
||||
self.next_block_root = block.message().parent_root();
|
||||
@@ -294,7 +298,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
||||
for ParentRootBlockIterator<'a, E, Hot, Cold>
|
||||
{
|
||||
type Item = Result<(Hash256, SignedBeaconBlock<E>), Error>;
|
||||
type Item = Result<(Hash256, SignedBeaconBlock<E, BlindedPayload<E>>), Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.do_next().transpose()
|
||||
@@ -322,10 +326,10 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> BlockIterator<'a, T,
|
||||
}
|
||||
}
|
||||
|
||||
fn do_next(&mut self) -> Result<Option<SignedBeaconBlock<T>>, Error> {
|
||||
fn do_next(&mut self) -> Result<Option<SignedBeaconBlock<T, BlindedPayload<T>>>, Error> {
|
||||
if let Some(result) = self.roots.next() {
|
||||
let (root, _slot) = result?;
|
||||
self.roots.inner.store.get_block(&root)
|
||||
self.roots.inner.store.get_blinded_block(&root)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
@@ -335,7 +339,7 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> BlockIterator<'a, T,
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Iterator
|
||||
for BlockIterator<'a, T, Hot, Cold>
|
||||
{
|
||||
type Item = Result<SignedBeaconBlock<T>, Error>;
|
||||
type Item = Result<SignedBeaconBlock<T, BlindedPayload<T>>, Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.do_next().transpose()
|
||||
|
||||
@@ -197,6 +197,28 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// Iterate through all keys and values in a particular column.
|
||||
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
|
||||
let start_key =
|
||||
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
|
||||
|
||||
let iter = self.db.keys_iter(self.read_options());
|
||||
iter.seek(&start_key);
|
||||
|
||||
Box::new(
|
||||
iter.take_while(move |key| key.matches_column(column))
|
||||
.map(move |bytes_key| {
|
||||
let key =
|
||||
bytes_key
|
||||
.remove_column(column)
|
||||
.ok_or(HotColdDBError::IterationError {
|
||||
unexpected_key: bytes_key,
|
||||
})?;
|
||||
Ok(key)
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}
|
||||
|
||||
@@ -43,6 +43,7 @@ use strum::{EnumString, IntoStaticStr};
|
||||
pub use types::*;
|
||||
|
||||
pub type ColumnIter<'a> = Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a>;
|
||||
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;
|
||||
|
||||
pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
/// Retrieve some bytes in `column` with `key`.
|
||||
@@ -77,11 +78,17 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
/// Compact the database, freeing space used by deleted items.
|
||||
fn compact(&self) -> Result<(), Error>;
|
||||
|
||||
/// Iterate through all values in a particular column.
|
||||
/// Iterate through all keys and values in a particular column.
|
||||
fn iter_column(&self, _column: DBColumn) -> ColumnIter {
|
||||
// Default impl for non LevelDB databases
|
||||
Box::new(std::iter::empty())
|
||||
}
|
||||
|
||||
/// Iterate through all keys in a particular column.
|
||||
fn iter_column_keys(&self, _column: DBColumn) -> ColumnKeyIter {
|
||||
// Default impl for non LevelDB databases
|
||||
Box::new(std::iter::empty())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
|
||||
@@ -152,6 +159,7 @@ pub enum StoreOp<'a, E: EthSpec> {
|
||||
DeleteStateTemporaryFlag(Hash256),
|
||||
DeleteBlock(Hash256),
|
||||
DeleteState(Hash256, Option<Slot>),
|
||||
DeleteExecutionPayload(Hash256),
|
||||
}
|
||||
|
||||
/// A unique column identifier.
|
||||
@@ -172,6 +180,9 @@ pub enum DBColumn {
|
||||
/// and then made non-temporary by the deletion of their state root from this column.
|
||||
#[strum(serialize = "bst")]
|
||||
BeaconStateTemporary,
|
||||
/// Execution payloads for blocks more recent than the finalized checkpoint.
|
||||
#[strum(serialize = "exp")]
|
||||
ExecPayload,
|
||||
/// For persisting in-memory state to the database.
|
||||
#[strum(serialize = "bch")]
|
||||
BeaconChain,
|
||||
@@ -198,6 +209,12 @@ pub enum DBColumn {
|
||||
DhtEnrs,
|
||||
}
|
||||
|
||||
/// A block from the database, which might have an execution payload or not.
|
||||
pub enum DatabaseBlock<E: EthSpec> {
|
||||
Full(SignedBeaconBlock<E>),
|
||||
Blinded(SignedBeaconBlock<E, BlindedPayload<E>>),
|
||||
}
|
||||
|
||||
impl DBColumn {
|
||||
pub fn as_str(self) -> &'static str {
|
||||
self.into()
|
||||
|
||||
@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use types::{Checkpoint, Hash256, Slot};
|
||||
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(8);
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(9);
|
||||
|
||||
// All the keys that get stored under the `BeaconMeta` column.
|
||||
//
|
||||
|
||||
@@ -76,7 +76,7 @@ where
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
self.get_block(&block_root)?
|
||||
self.get_blinded_block(&block_root)?
|
||||
.ok_or(Error::BlockNotFound(block_root))?,
|
||||
)
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user