Merge remote-tracking branch 'michael/separate-blocks' into tree-states

This commit is contained in:
Michael Sproul
2022-10-19 14:37:30 +11:00
22 changed files with 248 additions and 82 deletions

View File

@@ -7,7 +7,10 @@ use crate::config::{
};
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::hot_state_iter::HotStateRootIter;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::impls::{
beacon_state::{get_full_state, store_full_state},
frozen_block_slot::FrozenBlockSlot,
};
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
use crate::leveldb_store::{BytesKey, LevelDB};
use crate::memory_store::MemoryStore;
@@ -36,14 +39,14 @@ use state_processing::{
block_replayer::PreSlotHook, BlockProcessingError, BlockReplayer, SlotProcessingError,
};
use std::cmp::min;
use std::io::Read;
use std::io::{Read, Write};
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use types::*;
use types::{beacon_state::BeaconStateDiff, EthSpec};
use zstd::Decoder;
use zstd::{Decoder, Encoder};
pub const MAX_PARENT_STATES_TO_CACHE: u64 = 32;
@@ -106,6 +109,7 @@ pub enum HotColdDBError {
MissingExecutionPayload(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo,
MissingFrozenBlockSlot(Hash256),
HotStateSummaryError(BeaconStateError),
RestorePointDecodeError(ssz::DecodeError),
BlockReplayBeaconError(BeaconStateError),
@@ -358,6 +362,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn try_get_full_block(
&self,
block_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<DatabaseBlock<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
@@ -368,7 +373,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
// Load the blinded block.
let blinded_block = match self.get_blinded_block(block_root)? {
let blinded_block = match self.get_blinded_block(block_root, slot)? {
Some(block) => block,
None => return Ok(None),
};
@@ -414,8 +419,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_full_block(
&self,
block_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<SignedBeaconBlock<E>>, Error> {
match self.try_get_full_block(block_root)? {
match self.try_get_full_block(block_root, slot)? {
Some(DatabaseBlock::Full(block)) => Ok(Some(block)),
Some(DatabaseBlock::Blinded(block)) => Err(
HotColdDBError::MissingFullBlockExecutionPayloadPruned(*block_root, block.slot())
@@ -455,12 +461,108 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_blinded_block(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<E, BlindedPayload<E>>>, Error> {
slot: Option<Slot>,
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
if let Some(slot) = slot {
if slot < self.get_split_slot() || slot == 0 {
// To the freezer DB.
self.get_cold_blinded_block_by_slot(slot)
} else {
self.get_hot_blinded_block(block_root)
}
} else {
match self.get_hot_blinded_block(block_root)? {
Some(block) => Ok(Some(block)),
None => self.get_cold_blinded_block_by_root(block_root),
}
}
}
pub fn get_hot_blinded_block(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
self.get_block_with(block_root, |bytes| {
SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec)
})
}
pub fn get_cold_blinded_block_by_root(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
// Load slot.
if let Some(FrozenBlockSlot(block_slot)) = self.cold_db.get(block_root)? {
self.get_cold_blinded_block_by_slot(block_slot)
} else {
Ok(None)
}
}
pub fn get_cold_blinded_block_by_slot(
&self,
slot: Slot,
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
let bytes = if let Some(bytes) = self.cold_db.get_bytes(
DBColumn::BeaconBlockFrozen.into(),
&slot.as_u64().to_be_bytes(),
)? {
bytes
} else {
return Ok(None);
};
// FIXME(sproul): dodgy compression factor estimation
let mut ssz_bytes = Vec::with_capacity(2 * bytes.len());
let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?;
decoder
.read_to_end(&mut ssz_bytes)
.map_err(Error::Compression)?;
Ok(Some(SignedBeaconBlock::from_ssz_bytes(
&ssz_bytes, &self.spec,
)?))
}
pub fn put_cold_blinded_block(
&self,
block_root: &Hash256,
block: &SignedBlindedBeaconBlock<E>,
) -> Result<(), Error> {
let mut ops = Vec::with_capacity(2);
self.blinded_block_as_cold_kv_store_ops(block_root, block, &mut ops)?;
self.cold_db.do_atomically(ops)
}
pub fn blinded_block_as_cold_kv_store_ops(
&self,
block_root: &Hash256,
block: &SignedBlindedBeaconBlock<E>,
kv_store_ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
// Write the block root to slot mapping.
let slot = block.slot();
kv_store_ops.push(FrozenBlockSlot(slot).as_kv_store_op(*block_root)?);
// Write the block keyed by slot.
let db_key = get_key_for_col(
DBColumn::BeaconBlockFrozen.into(),
&slot.as_u64().to_be_bytes(),
);
// FIXME(sproul): fix compression estimate and level
let compression_level = 3;
let ssz_bytes = block.as_ssz_bytes();
let mut compressed_value = Vec::with_capacity(ssz_bytes.len() / 2);
let mut encoder =
Encoder::new(&mut compressed_value, compression_level).map_err(Error::Compression)?;
encoder.write_all(&ssz_bytes).map_err(Error::Compression)?;
encoder.finish().map_err(Error::Compression)?;
kv_store_ops.push(KeyValueStoreOp::PutKeyValue(db_key, compressed_value));
Ok(())
}
/// Fetch a block from the store, ignoring which fork variant it *should* be for.
pub fn get_block_any_variant<Payload: ExecPayload<E>>(
&self,
@@ -898,19 +1000,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}) = self.load_hot_state_summary(state_root)?
{
// Load the latest block, and use it to confirm the validity of this state.
let latest_block = if let Some(block) = self.get_blinded_block(&latest_block_root)? {
block
} else {
// Dangling state, will be deleted fully once finalization advances past it.
debug!(
self.log,
"Ignoring state load for dangling state";
"state_root" => ?state_root,
"slot" => slot,
"latest_block_root" => ?latest_block_root,
);
return Ok(None);
};
let latest_block =
if let Some(block) = self.get_blinded_block(&latest_block_root, None)? {
block
} else {
// Dangling state, will be deleted fully once finalization advances past it.
debug!(
self.log,
"Ignoring state load for dangling state";
"state_root" => ?state_root,
"slot" => slot,
"latest_block_root" => ?latest_block_root,
);
return Ok(None);
};
// On a fork boundary slot load a full state from disk.
if self.spec.fork_activated_at_slot::<E>(slot).is_some() {
@@ -1847,6 +1950,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Copy all of the states between the new finalized state and the split slot, from the hot DB to
// the cold DB.
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
let mut cold_db_block_ops: Vec<KeyValueStoreOp> = vec![];
let state_root_iter = RootsIterator::new(&store, finalized_state);
for maybe_tuple in state_root_iter.take_while(|result| match result {
@@ -1888,6 +1992,16 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
if store.config.prune_payloads {
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
}
// Copy the blinded block from the hot database to the freezer.
let blinded_block = store
.get_hot_blinded_block(&block_root)?
.ok_or(Error::BlockNotFound(block_root))?;
store.blinded_block_as_cold_kv_store_ops(
&block_root,
&blinded_block,
&mut cold_db_block_ops,
)?;
}
// Warning: Critical section. We have to take care not to put any of the two databases in an
@@ -1901,6 +2015,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// exceedingly rare event, this should be an acceptable tradeoff.
// Flush to disk all the states that have just been migrated to the cold store.
store.cold_db.do_atomically(cold_db_block_ops)?;
store.cold_db.sync()?;
{