mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 10:11:44 +00:00
Compare commits
9 Commits
mallory-ol
...
import-blo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6c2d66f17 | ||
|
|
1dd4f59610 | ||
|
|
95df7ea443 | ||
|
|
582e389e74 | ||
|
|
7c905cb891 | ||
|
|
8647a3e8ff | ||
|
|
299c0b2d17 | ||
|
|
57f58b8f1a | ||
|
|
027f3ff26e |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2085,6 +2085,7 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"clap_utils",
|
"clap_utils",
|
||||||
"environment",
|
"environment",
|
||||||
|
"ethereum_ssz",
|
||||||
"hex",
|
"hex",
|
||||||
"serde",
|
"serde",
|
||||||
"store",
|
"store",
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ clap_utils = { workspace = true }
|
|||||||
environment = { workspace = true }
|
environment = { workspace = true }
|
||||||
hex = { workspace = true }
|
hex = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
ethereum_ssz = { workspace = true }
|
||||||
store = { workspace = true }
|
store = { workspace = true }
|
||||||
strum = { workspace = true }
|
strum = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use clap_utils::FLAG_HEADER;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use store::hdiff::HierarchyConfig;
|
use store::hdiff::HierarchyConfig;
|
||||||
|
use types::Slot;
|
||||||
|
|
||||||
use crate::InspectTarget;
|
use crate::InspectTarget;
|
||||||
|
|
||||||
@@ -80,6 +81,9 @@ pub enum DatabaseManagerSubcommand {
|
|||||||
PruneBlobs(PruneBlobs),
|
PruneBlobs(PruneBlobs),
|
||||||
PruneStates(PruneStates),
|
PruneStates(PruneStates),
|
||||||
Compact(Compact),
|
Compact(Compact),
|
||||||
|
SetOldestBlobSlot(SetOldestBlobSlot),
|
||||||
|
InspectBlobs(InspectBlobs),
|
||||||
|
ImportBlobs(ImportBlobs),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
||||||
@@ -228,3 +232,29 @@ pub struct Compact {
|
|||||||
)]
|
)]
|
||||||
pub output_dir: Option<PathBuf>,
|
pub output_dir: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
||||||
|
#[clap(about = "Manually override the database's view of the oldest blob known.")]
|
||||||
|
pub struct SetOldestBlobSlot {
|
||||||
|
#[clap(
|
||||||
|
long,
|
||||||
|
value_name = "SLOT",
|
||||||
|
help = "Slot of the oldest blob in the database.",
|
||||||
|
display_order = 0
|
||||||
|
)]
|
||||||
|
pub slot: Slot,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
||||||
|
#[clap(about = "Produce a summary of blob availability in the database.")]
|
||||||
|
pub struct InspectBlobs {
|
||||||
|
#[clap(long, help = "Verify blob data integrity.", display_order = 0)]
|
||||||
|
pub verify: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
||||||
|
#[clap(about = "Import blobs from another node's blob database.")]
|
||||||
|
pub struct ImportBlobs {
|
||||||
|
#[clap(long, help = "Path to the database to import", display_order = 0)]
|
||||||
|
pub source_db: PathBuf,
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
pub mod cli;
|
pub mod cli;
|
||||||
|
|
||||||
use crate::cli::DatabaseManager;
|
use crate::cli::DatabaseManager;
|
||||||
use crate::cli::Migrate;
|
use crate::cli::Migrate;
|
||||||
use crate::cli::PruneStates;
|
use crate::cli::PruneStates;
|
||||||
@@ -14,17 +15,17 @@ use environment::{Environment, RuntimeContext};
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::path::PathBuf;
|
use std::path::{Path, PathBuf};
|
||||||
use store::KeyValueStore;
|
|
||||||
use store::{
|
use store::{
|
||||||
database::interface::BeaconNodeBackend,
|
database::interface::BeaconNodeBackend,
|
||||||
errors::Error,
|
errors::Error,
|
||||||
|
hot_cold_store::HotColdDBError,
|
||||||
metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION},
|
metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION},
|
||||||
DBColumn, HotColdDB,
|
BlobSidecarListFromRoot, DBColumn, HotColdDB, KeyValueStore, KeyValueStoreOp,
|
||||||
};
|
};
|
||||||
use strum::{EnumString, EnumVariantNames};
|
use strum::{EnumString, EnumVariantNames};
|
||||||
use tracing::{info, warn};
|
use tracing::{debug, info, warn};
|
||||||
use types::{BeaconState, EthSpec, Slot};
|
use types::{BeaconState, BlobSidecarList, EthSpec, Hash256, Slot};
|
||||||
|
|
||||||
fn parse_client_config<E: EthSpec>(
|
fn parse_client_config<E: EthSpec>(
|
||||||
cli_args: &ArgMatches,
|
cli_args: &ArgMatches,
|
||||||
@@ -454,6 +455,184 @@ pub fn prune_states<E: EthSpec>(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn set_oldest_blob_slot<E: EthSpec>(
|
||||||
|
slot: Slot,
|
||||||
|
client_config: ClientConfig,
|
||||||
|
runtime_context: &RuntimeContext<E>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let spec = &runtime_context.eth2_config.spec;
|
||||||
|
let hot_path = client_config.get_db_path();
|
||||||
|
let cold_path = client_config.get_freezer_db_path();
|
||||||
|
let blobs_path = client_config.get_blobs_db_path();
|
||||||
|
|
||||||
|
let db = HotColdDB::<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>::open(
|
||||||
|
&hot_path,
|
||||||
|
&cold_path,
|
||||||
|
&blobs_path,
|
||||||
|
|_, _, _| Ok(()),
|
||||||
|
client_config.store,
|
||||||
|
spec.clone(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let old_blob_info = db.get_blob_info();
|
||||||
|
let mut new_blob_info = old_blob_info.clone();
|
||||||
|
new_blob_info.oldest_blob_slot = Some(slot);
|
||||||
|
|
||||||
|
info!(
|
||||||
|
previous = ?old_blob_info.oldest_blob_slot,
|
||||||
|
new = ?slot,
|
||||||
|
"Updating oldest blob slot"
|
||||||
|
);
|
||||||
|
|
||||||
|
db.compare_and_set_blob_info_with_write(old_blob_info, new_blob_info)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inspect_blobs<E: EthSpec>(
|
||||||
|
_verify: bool,
|
||||||
|
client_config: ClientConfig,
|
||||||
|
runtime_context: &RuntimeContext<E>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let spec = &runtime_context.eth2_config.spec;
|
||||||
|
let hot_path = client_config.get_db_path();
|
||||||
|
let cold_path = client_config.get_freezer_db_path();
|
||||||
|
let blobs_path = client_config.get_blobs_db_path();
|
||||||
|
|
||||||
|
let db = HotColdDB::<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>::open(
|
||||||
|
&hot_path,
|
||||||
|
&cold_path,
|
||||||
|
&blobs_path,
|
||||||
|
|_, _, _| Ok(()),
|
||||||
|
client_config.store,
|
||||||
|
spec.clone(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let split = db.get_split_info();
|
||||||
|
let oldest_block_slot = db.get_oldest_block_slot();
|
||||||
|
let deneb_start_slot = spec
|
||||||
|
.deneb_fork_epoch
|
||||||
|
.map_or(Slot::new(0), |epoch| epoch.start_slot(E::slots_per_epoch()));
|
||||||
|
let start_slot = oldest_block_slot.max(deneb_start_slot);
|
||||||
|
|
||||||
|
if oldest_block_slot > deneb_start_slot {
|
||||||
|
info!(
|
||||||
|
start = %deneb_start_slot,
|
||||||
|
end = %(oldest_block_slot - 1),
|
||||||
|
"Missing blobs AND blocks"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut last_block_root = Hash256::ZERO;
|
||||||
|
|
||||||
|
for res in db.forwards_block_roots_iterator_until(start_slot, split.slot, || {
|
||||||
|
db.get_advanced_hot_state(split.block_root, split.slot, split.state_root)?
|
||||||
|
.ok_or(HotColdDBError::MissingSplitState(split.state_root, split.slot).into())
|
||||||
|
.map(|(_, split_state)| (split_state, split.block_root))
|
||||||
|
})? {
|
||||||
|
let (block_root, slot) = res?;
|
||||||
|
|
||||||
|
if last_block_root == block_root {
|
||||||
|
info!("Slot {}: no block", slot);
|
||||||
|
} else if let BlobSidecarListFromRoot::Blobs(blobs) = db.get_blobs(&block_root)? {
|
||||||
|
// FIXME(sproul): do verification here
|
||||||
|
info!("Slot {}: {} blobs stored", slot, blobs.len());
|
||||||
|
} else {
|
||||||
|
// Check whether blobs are expected.
|
||||||
|
let block = db
|
||||||
|
.get_blinded_block(&block_root)?
|
||||||
|
.ok_or(Error::BlockNotFound(block_root))?;
|
||||||
|
|
||||||
|
let num_expected_blobs = block
|
||||||
|
.message()
|
||||||
|
.body()
|
||||||
|
.blob_kzg_commitments()
|
||||||
|
.map_or(0, |blobs| blobs.len());
|
||||||
|
if num_expected_blobs > 0 {
|
||||||
|
warn!(
|
||||||
|
"Slot {}: {} blobs missing ({:?})",
|
||||||
|
slot, num_expected_blobs, block_root
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
info!("Slot {}: block with 0 blobs", slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
last_block_root = block_root;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn import_blobs<E: EthSpec>(
|
||||||
|
source_path: &Path,
|
||||||
|
client_config: ClientConfig,
|
||||||
|
runtime_context: &RuntimeContext<E>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let spec = &runtime_context.eth2_config.spec;
|
||||||
|
let hot_path = client_config.get_db_path();
|
||||||
|
let cold_path = client_config.get_freezer_db_path();
|
||||||
|
let blobs_path = client_config.get_blobs_db_path();
|
||||||
|
|
||||||
|
let db = HotColdDB::<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>::open(
|
||||||
|
&hot_path,
|
||||||
|
&cold_path,
|
||||||
|
&blobs_path,
|
||||||
|
|_, _, _| Ok(()),
|
||||||
|
client_config.store.clone(),
|
||||||
|
spec.clone(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let source_db = BeaconNodeBackend::<E>::open(&client_config.store, source_path)?;
|
||||||
|
|
||||||
|
let prev_blob_info = db.get_blob_info();
|
||||||
|
let mut oldest_blob_slot = prev_blob_info
|
||||||
|
.oldest_blob_slot
|
||||||
|
.unwrap_or(Slot::new(u64::MAX));
|
||||||
|
|
||||||
|
let mut num_already_known = 0;
|
||||||
|
let mut num_imported = 0;
|
||||||
|
|
||||||
|
let mut ops = vec![];
|
||||||
|
let batch_size = 1024;
|
||||||
|
|
||||||
|
for res in source_db.iter_column(DBColumn::BeaconBlob) {
|
||||||
|
let (block_root, blob_bytes) = res?;
|
||||||
|
|
||||||
|
if db.get_blobs(&block_root)?.len() > 0 {
|
||||||
|
num_already_known += 1;
|
||||||
|
} else {
|
||||||
|
// FIXME(sproul): max len?
|
||||||
|
let blobs = BlobSidecarList::<E>::from_ssz_bytes(&blob_bytes, 64)?;
|
||||||
|
ops.push(KeyValueStoreOp::PutKeyValue(
|
||||||
|
DBColumn::BeaconBlob,
|
||||||
|
block_root.to_vec(),
|
||||||
|
blob_bytes,
|
||||||
|
));
|
||||||
|
|
||||||
|
if let Some(blob) = blobs.first() {
|
||||||
|
oldest_blob_slot = oldest_blob_slot.min(blob.slot());
|
||||||
|
debug!("Imported blobs for slot {} ({:?})", blob.slot(), block_root);
|
||||||
|
}
|
||||||
|
num_imported += 1;
|
||||||
|
|
||||||
|
if ops.len() >= batch_size {
|
||||||
|
db.blobs_db.do_atomically(std::mem::take(&mut ops))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
db.blobs_db.do_atomically(ops)?;
|
||||||
|
|
||||||
|
let mut new_blob_info = prev_blob_info.clone();
|
||||||
|
new_blob_info.oldest_blob_slot = Some(oldest_blob_slot);
|
||||||
|
db.compare_and_set_blob_info_with_write(prev_blob_info, new_blob_info)?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
imported = num_imported,
|
||||||
|
already_known = num_already_known,
|
||||||
|
"Blobs imported"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Run the database manager, returning an error string if the operation did not succeed.
|
/// Run the database manager, returning an error string if the operation did not succeed.
|
||||||
pub fn run<E: EthSpec>(
|
pub fn run<E: EthSpec>(
|
||||||
cli_args: &ArgMatches,
|
cli_args: &ArgMatches,
|
||||||
@@ -512,5 +691,14 @@ pub fn run<E: EthSpec>(
|
|||||||
let compact_config = parse_compact_config(compact_config)?;
|
let compact_config = parse_compact_config(compact_config)?;
|
||||||
compact_db::<E>(compact_config, client_config).map_err(format_err)
|
compact_db::<E>(compact_config, client_config).map_err(format_err)
|
||||||
}
|
}
|
||||||
|
cli::DatabaseManagerSubcommand::SetOldestBlobSlot(blob_slot_config) => {
|
||||||
|
set_oldest_blob_slot(blob_slot_config.slot, client_config, &context).map_err(format_err)
|
||||||
|
}
|
||||||
|
cli::DatabaseManagerSubcommand::InspectBlobs(_) => {
|
||||||
|
inspect_blobs(false, client_config, &context).map_err(format_err)
|
||||||
|
}
|
||||||
|
cli::DatabaseManagerSubcommand::ImportBlobs(config) => {
|
||||||
|
import_blobs(&config.source_db, client_config, &context).map_err(format_err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user