mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Make the block cache optional (#8066)
Address contention on the store's `block_cache` by allowing it to be disabled when `--block-cache-size 0` is provided, and also making this the default. Co-Authored-By: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
@@ -779,7 +779,7 @@ pub fn cli_app() -> Command {
|
||||
.long("block-cache-size")
|
||||
.value_name("SIZE")
|
||||
.help("Specifies how many blocks the database should cache in memory")
|
||||
.default_value("5")
|
||||
.default_value("0")
|
||||
.action(ArgAction::Set)
|
||||
.display_order(0)
|
||||
)
|
||||
|
||||
@@ -19,7 +19,7 @@ pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::LevelDb;
|
||||
pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
|
||||
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
|
||||
pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8;
|
||||
pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(64);
|
||||
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 0;
|
||||
pub const DEFAULT_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128);
|
||||
pub const DEFAULT_STATE_CACHE_HEADROOM: NonZeroUsize = new_non_zero_usize(1);
|
||||
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
|
||||
@@ -34,7 +34,7 @@ pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0;
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct StoreConfig {
|
||||
/// Maximum number of blocks to store in the in-memory block cache.
|
||||
pub block_cache_size: NonZeroUsize,
|
||||
pub block_cache_size: usize,
|
||||
/// Maximum number of states to store in the in-memory state cache.
|
||||
pub state_cache_size: NonZeroUsize,
|
||||
/// Minimum number of states to cull from the state cache upon fullness.
|
||||
|
||||
@@ -70,7 +70,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
/// The hot database also contains all blocks.
|
||||
pub hot_db: Hot,
|
||||
/// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded.
|
||||
block_cache: Mutex<BlockCache<E>>,
|
||||
block_cache: Option<Mutex<BlockCache<E>>>,
|
||||
/// Cache of beacon states.
|
||||
///
|
||||
/// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required.
|
||||
@@ -229,7 +229,9 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
cold_db: MemoryStore::open(),
|
||||
blobs_db: MemoryStore::open(),
|
||||
hot_db: MemoryStore::open(),
|
||||
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
|
||||
block_cache: NonZeroUsize::new(config.block_cache_size)
|
||||
.map(BlockCache::new)
|
||||
.map(Mutex::new),
|
||||
state_cache: Mutex::new(StateCache::new(
|
||||
config.state_cache_size,
|
||||
config.state_cache_headroom,
|
||||
@@ -281,7 +283,9 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
|
||||
blobs_db: BeaconNodeBackend::open(&config, blobs_db_path)?,
|
||||
cold_db: BeaconNodeBackend::open(&config, cold_path)?,
|
||||
hot_db,
|
||||
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
|
||||
block_cache: NonZeroUsize::new(config.block_cache_size)
|
||||
.map(BlockCache::new)
|
||||
.map(Mutex::new),
|
||||
state_cache: Mutex::new(StateCache::new(
|
||||
config.state_cache_size,
|
||||
config.state_cache_headroom,
|
||||
@@ -488,14 +492,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
pub fn register_metrics(&self) {
|
||||
let hsc_metrics = self.historic_state_cache.lock().metrics();
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::STORE_BEACON_BLOCK_CACHE_SIZE,
|
||||
self.block_cache.lock().block_cache.len() as i64,
|
||||
);
|
||||
metrics::set_gauge(
|
||||
&metrics::STORE_BEACON_BLOB_CACHE_SIZE,
|
||||
self.block_cache.lock().blob_cache.len() as i64,
|
||||
);
|
||||
if let Some(block_cache) = &self.block_cache {
|
||||
let cache = block_cache.lock();
|
||||
metrics::set_gauge(
|
||||
&metrics::STORE_BEACON_BLOCK_CACHE_SIZE,
|
||||
cache.block_cache.len() as i64,
|
||||
);
|
||||
metrics::set_gauge(
|
||||
&metrics::STORE_BEACON_BLOB_CACHE_SIZE,
|
||||
cache.blob_cache.len() as i64,
|
||||
);
|
||||
}
|
||||
let state_cache = self.state_cache.lock();
|
||||
metrics::set_gauge(
|
||||
&metrics::STORE_BEACON_STATE_CACHE_SIZE,
|
||||
@@ -553,7 +560,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
let block = self.block_as_kv_store_ops(block_root, block, &mut ops)?;
|
||||
self.hot_db.do_atomically(ops)?;
|
||||
// Update cache.
|
||||
self.block_cache.lock().put_block(*block_root, block);
|
||||
self.block_cache
|
||||
.as_ref()
|
||||
.inspect(|cache| cache.lock().put_block(*block_root, block));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -605,7 +614,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
|
||||
|
||||
// Check the cache.
|
||||
if let Some(block) = self.block_cache.lock().get_block(block_root) {
|
||||
if let Some(cache) = &self.block_cache
|
||||
&& let Some(block) = cache.lock().get_block(block_root)
|
||||
{
|
||||
metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT);
|
||||
return Ok(Some(DatabaseBlock::Full(block.clone())));
|
||||
}
|
||||
@@ -630,8 +641,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
// Add to cache.
|
||||
self.block_cache
|
||||
.lock()
|
||||
.put_block(*block_root, full_block.clone());
|
||||
.as_ref()
|
||||
.inspect(|cache| cache.lock().put_block(*block_root, full_block.clone()));
|
||||
|
||||
DatabaseBlock::Full(full_block)
|
||||
} else if !self.config.prune_payloads {
|
||||
@@ -902,7 +913,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
/// Delete a block from the store and the block cache.
|
||||
pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
|
||||
self.block_cache.lock().delete(block_root);
|
||||
self.block_cache
|
||||
.as_ref()
|
||||
.inspect(|cache| cache.lock().delete(block_root));
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::BeaconBlock, block_root.as_slice())?;
|
||||
self.hot_db
|
||||
@@ -917,7 +930,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
block_root.as_slice(),
|
||||
&blobs.as_ssz_bytes(),
|
||||
)?;
|
||||
self.block_cache.lock().put_blobs(*block_root, blobs);
|
||||
self.block_cache
|
||||
.as_ref()
|
||||
.inspect(|cache| cache.lock().put_blobs(*block_root, blobs));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -945,9 +960,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
self.blobs_db
|
||||
.put(&DATA_COLUMN_CUSTODY_INFO_KEY, &data_column_custody_info)?;
|
||||
|
||||
self.block_cache
|
||||
.lock()
|
||||
.put_data_column_custody_info(Some(data_column_custody_info));
|
||||
self.block_cache.as_ref().inspect(|cache| {
|
||||
cache
|
||||
.lock()
|
||||
.put_data_column_custody_info(Some(data_column_custody_info))
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -964,8 +981,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
&data_column.as_ssz_bytes(),
|
||||
)?;
|
||||
self.block_cache
|
||||
.lock()
|
||||
.put_data_column(*block_root, data_column);
|
||||
.as_ref()
|
||||
.inspect(|cache| cache.lock().put_data_column(*block_root, data_column));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1399,7 +1416,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
// Update database whilst holding a lock on cache, to ensure that the cache updates
|
||||
// atomically with the database.
|
||||
let mut guard = self.block_cache.lock();
|
||||
let guard = self.block_cache.as_ref().map(|cache| cache.lock());
|
||||
|
||||
let blob_cache_ops = blobs_ops.clone();
|
||||
// Try to execute blobs store ops.
|
||||
@@ -1446,56 +1463,67 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
for op in hot_db_cache_ops {
|
||||
// Delete from the state cache.
|
||||
for op in &hot_db_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlock(block_root, block) => {
|
||||
guard.put_block(block_root, (*block).clone());
|
||||
}
|
||||
|
||||
StoreOp::PutBlobs(_, _) => (),
|
||||
|
||||
StoreOp::PutDataColumns(_, _) => (),
|
||||
|
||||
StoreOp::PutState(_, _) => (),
|
||||
|
||||
StoreOp::PutStateSummary(_, _) => (),
|
||||
|
||||
StoreOp::DeleteBlock(block_root) => {
|
||||
guard.delete_block(&block_root);
|
||||
self.state_cache.lock().delete_block_states(&block_root);
|
||||
self.state_cache.lock().delete_block_states(block_root);
|
||||
}
|
||||
|
||||
StoreOp::DeleteState(state_root, _) => {
|
||||
self.state_cache.lock().delete_state(&state_root)
|
||||
self.state_cache.lock().delete_state(state_root)
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(_) => (),
|
||||
|
||||
StoreOp::DeleteDataColumns(_, _) => (),
|
||||
|
||||
StoreOp::DeleteExecutionPayload(_) => (),
|
||||
|
||||
StoreOp::DeleteSyncCommitteeBranch(_) => (),
|
||||
|
||||
StoreOp::KeyValueOp(_) => (),
|
||||
}
|
||||
}
|
||||
|
||||
for op in blob_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlobs(block_root, blobs) => {
|
||||
guard.put_blobs(block_root, blobs);
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
guard.delete_blobs(&block_root);
|
||||
}
|
||||
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
drop(guard);
|
||||
// If the block cache is enabled, also delete from the block cache.
|
||||
if let Some(mut guard) = guard {
|
||||
for op in hot_db_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlock(block_root, block) => {
|
||||
guard.put_block(block_root, (*block).clone());
|
||||
}
|
||||
|
||||
StoreOp::PutBlobs(_, _) => (),
|
||||
|
||||
StoreOp::PutDataColumns(_, _) => (),
|
||||
|
||||
StoreOp::PutState(_, _) => (),
|
||||
|
||||
StoreOp::PutStateSummary(_, _) => (),
|
||||
|
||||
StoreOp::DeleteBlock(block_root) => {
|
||||
guard.delete_block(&block_root);
|
||||
}
|
||||
|
||||
StoreOp::DeleteState(_, _) => (),
|
||||
|
||||
StoreOp::DeleteBlobs(_) => (),
|
||||
|
||||
StoreOp::DeleteDataColumns(_, _) => (),
|
||||
|
||||
StoreOp::DeleteExecutionPayload(_) => (),
|
||||
|
||||
StoreOp::DeleteSyncCommitteeBranch(_) => (),
|
||||
|
||||
StoreOp::KeyValueOp(_) => (),
|
||||
}
|
||||
}
|
||||
|
||||
for op in blob_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlobs(block_root, blobs) => {
|
||||
guard.put_blobs(block_root, blobs);
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
guard.delete_blobs(&block_root);
|
||||
}
|
||||
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2425,21 +2453,23 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
/// If custody info doesn't exist in the cache,
|
||||
/// try to fetch from the DB and prime the cache.
|
||||
pub fn get_data_column_custody_info(&self) -> Result<Option<DataColumnCustodyInfo>, Error> {
|
||||
let Some(data_column_custody_info) = self.block_cache.lock().get_data_column_custody_info()
|
||||
else {
|
||||
let data_column_custody_info = self
|
||||
.blobs_db
|
||||
.get::<DataColumnCustodyInfo>(&DATA_COLUMN_CUSTODY_INFO_KEY)?;
|
||||
if let Some(cache) = &self.block_cache
|
||||
&& let Some(data_column_custody_info) = cache.lock().get_data_column_custody_info()
|
||||
{
|
||||
return Ok(Some(data_column_custody_info));
|
||||
}
|
||||
let data_column_custody_info = self
|
||||
.blobs_db
|
||||
.get::<DataColumnCustodyInfo>(&DATA_COLUMN_CUSTODY_INFO_KEY)?;
|
||||
|
||||
// Update the cache
|
||||
self.block_cache
|
||||
// Update the cache
|
||||
self.block_cache.as_ref().inspect(|cache| {
|
||||
cache
|
||||
.lock()
|
||||
.put_data_column_custody_info(data_column_custody_info.clone());
|
||||
.put_data_column_custody_info(data_column_custody_info.clone())
|
||||
});
|
||||
|
||||
return Ok(data_column_custody_info);
|
||||
};
|
||||
|
||||
Ok(Some(data_column_custody_info))
|
||||
Ok(data_column_custody_info)
|
||||
}
|
||||
|
||||
/// Fetch all columns for a given block from the store.
|
||||
@@ -2460,9 +2490,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
/// Fetch blobs for a given block from the store.
|
||||
pub fn get_blobs(&self, block_root: &Hash256) -> Result<BlobSidecarListFromRoot<E>, Error> {
|
||||
// Check the cache.
|
||||
if let Some(blobs) = self.block_cache.lock().get_blobs(block_root) {
|
||||
if let Some(blobs) = self
|
||||
.block_cache
|
||||
.as_ref()
|
||||
.and_then(|cache| cache.lock().get_blobs(block_root).cloned())
|
||||
{
|
||||
metrics::inc_counter(&metrics::BEACON_BLOBS_CACHE_HIT_COUNT);
|
||||
return Ok(blobs.clone().into());
|
||||
return Ok(blobs.into());
|
||||
}
|
||||
|
||||
match self
|
||||
@@ -2481,8 +2515,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
{
|
||||
let blobs = BlobSidecarList::new(blobs, max_blobs_per_block as usize)?;
|
||||
self.block_cache
|
||||
.lock()
|
||||
.put_blobs(*block_root, blobs.clone());
|
||||
.as_ref()
|
||||
.inspect(|cache| cache.lock().put_blobs(*block_root, blobs.clone()));
|
||||
|
||||
Ok(BlobSidecarListFromRoot::Blobs(blobs))
|
||||
} else {
|
||||
@@ -2515,8 +2549,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
// Check the cache.
|
||||
if let Some(data_column) = self
|
||||
.block_cache
|
||||
.lock()
|
||||
.get_data_column(block_root, column_index)
|
||||
.as_ref()
|
||||
.and_then(|cache| cache.lock().get_data_column(block_root, column_index))
|
||||
{
|
||||
metrics::inc_counter(&metrics::BEACON_DATA_COLUMNS_CACHE_HIT_COUNT);
|
||||
return Ok(Some(data_column));
|
||||
@@ -2528,9 +2562,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
)? {
|
||||
Some(ref data_column_bytes) => {
|
||||
let data_column = Arc::new(DataColumnSidecar::from_ssz_bytes(data_column_bytes)?);
|
||||
self.block_cache
|
||||
.lock()
|
||||
.put_data_column(*block_root, data_column.clone());
|
||||
self.block_cache.as_ref().inspect(|cache| {
|
||||
cache
|
||||
.lock()
|
||||
.put_data_column(*block_root, data_column.clone())
|
||||
});
|
||||
Ok(Some(data_column))
|
||||
}
|
||||
None => Ok(None),
|
||||
@@ -3264,11 +3300,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
|
||||
// Remove deleted blobs from the cache.
|
||||
let mut block_cache = self.block_cache.lock();
|
||||
for block_root in removed_block_roots {
|
||||
block_cache.delete_blobs(&block_root);
|
||||
if let Some(mut block_cache) = self.block_cache.as_ref().map(|cache| cache.lock()) {
|
||||
for block_root in removed_block_roots {
|
||||
block_cache.delete_blobs(&block_root);
|
||||
}
|
||||
}
|
||||
drop(block_cache);
|
||||
|
||||
let new_blob_info = BlobInfo {
|
||||
oldest_blob_slot: Some(end_slot + 1),
|
||||
|
||||
@@ -22,7 +22,7 @@ Options:
|
||||
Data directory for the blobs database.
|
||||
--block-cache-size <SIZE>
|
||||
Specifies how many blocks the database should cache in memory
|
||||
[default: 5]
|
||||
[default: 0]
|
||||
--boot-nodes <ENR/MULTIADDR LIST>
|
||||
One or more comma-delimited base64-encoded ENR's to bootstrap the p2p
|
||||
network. Multiaddr is also supported.
|
||||
|
||||
@@ -1839,12 +1839,25 @@ fn slots_per_restore_point_flag() {
|
||||
.run_with_zero_port();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn block_cache_size_default() {
|
||||
CommandLineTest::new()
|
||||
.run_with_zero_port()
|
||||
.with_config(|config| assert_eq!(config.store.block_cache_size, 0));
|
||||
}
|
||||
#[test]
|
||||
fn block_cache_size_flag() {
|
||||
CommandLineTest::new()
|
||||
.flag("block-cache-size", Some("4"))
|
||||
.run_with_zero_port()
|
||||
.with_config(|config| assert_eq!(config.store.block_cache_size, new_non_zero_usize(4)));
|
||||
.with_config(|config| assert_eq!(config.store.block_cache_size, 4));
|
||||
}
|
||||
#[test]
|
||||
fn block_cache_size_zero() {
|
||||
CommandLineTest::new()
|
||||
.flag("block-cache-size", Some("0"))
|
||||
.run_with_zero_port()
|
||||
.with_config(|config| assert_eq!(config.store.block_cache_size, 0));
|
||||
}
|
||||
#[test]
|
||||
fn state_cache_size_default() {
|
||||
|
||||
Reference in New Issue
Block a user