mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 08:52:54 +00:00
Implement import-blobs
This commit is contained in:
@@ -15,3 +15,4 @@ types = { workspace = true }
|
|||||||
slog = { workspace = true }
|
slog = { workspace = true }
|
||||||
strum = { workspace = true }
|
strum = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
ethereum_ssz = { workspace = true }
|
||||||
|
|||||||
@@ -82,6 +82,7 @@ pub enum DatabaseManagerSubcommand {
|
|||||||
Compact(Compact),
|
Compact(Compact),
|
||||||
SetOldestBlobSlot(SetOldestBlobSlot),
|
SetOldestBlobSlot(SetOldestBlobSlot),
|
||||||
InspectBlobs(InspectBlobs),
|
InspectBlobs(InspectBlobs),
|
||||||
|
ImportBlobs(ImportBlobs),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
||||||
@@ -244,8 +245,15 @@ pub struct SetOldestBlobSlot {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
||||||
#[clap(about = "Produce a summary of blob availability in the databasue.")]
|
#[clap(about = "Produce a summary of blob availability in the database.")]
|
||||||
pub struct InspectBlobs {
|
pub struct InspectBlobs {
|
||||||
#[clap(long, help = "Verify blob data integrity.", display_order = 0)]
|
#[clap(long, help = "Verify blob data integrity.", display_order = 0)]
|
||||||
pub verify: bool,
|
pub verify: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
|
||||||
|
#[clap(about = "Import blobs from another node's blob database.")]
|
||||||
|
pub struct ImportBlobs {
|
||||||
|
#[clap(long, help = "Path to the database to import", display_order = 0)]
|
||||||
|
pub source_db: PathBuf,
|
||||||
|
}
|
||||||
|
|||||||
@@ -12,18 +12,19 @@ use clap::ValueEnum;
|
|||||||
use cli::{Compact, Inspect};
|
use cli::{Compact, Inspect};
|
||||||
use environment::{Environment, RuntimeContext};
|
use environment::{Environment, RuntimeContext};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use slog::{info, warn, Logger};
|
use slog::{debug, info, warn, Logger};
|
||||||
|
use ssz::Decode;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::path::PathBuf;
|
use std::path::{Path, PathBuf};
|
||||||
use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN;
|
use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN;
|
||||||
use store::{
|
use store::{
|
||||||
errors::Error,
|
errors::Error,
|
||||||
metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION},
|
metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION},
|
||||||
DBColumn, HotColdDB, KeyValueStore, LevelDB,
|
DBColumn, HotColdDB, KeyValueStore, LevelDB, StoreItem,
|
||||||
};
|
};
|
||||||
use strum::{EnumString, EnumVariantNames};
|
use strum::{EnumString, EnumVariantNames};
|
||||||
use types::{BeaconState, EthSpec, Hash256, Slot};
|
use types::{BeaconState, BlobSidecarList, EthSpec, Hash256, Slot};
|
||||||
|
|
||||||
fn parse_client_config<E: EthSpec>(
|
fn parse_client_config<E: EthSpec>(
|
||||||
cli_args: &ArgMatches,
|
cli_args: &ArgMatches,
|
||||||
@@ -583,6 +584,76 @@ fn inspect_blobs<E: EthSpec>(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn import_blobs<E: EthSpec>(
|
||||||
|
source_path: &Path,
|
||||||
|
client_config: ClientConfig,
|
||||||
|
runtime_context: &RuntimeContext<E>,
|
||||||
|
log: Logger,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let spec = &runtime_context.eth2_config.spec;
|
||||||
|
let hot_path = client_config.get_db_path();
|
||||||
|
let cold_path = client_config.get_freezer_db_path();
|
||||||
|
let blobs_path = client_config.get_blobs_db_path();
|
||||||
|
|
||||||
|
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
|
||||||
|
&hot_path,
|
||||||
|
&cold_path,
|
||||||
|
&blobs_path,
|
||||||
|
|_, _, _| Ok(()),
|
||||||
|
client_config.store,
|
||||||
|
spec.clone(),
|
||||||
|
log.clone(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let source_db = LevelDB::<E>::open(source_path)?;
|
||||||
|
|
||||||
|
let prev_blob_info = db.get_blob_info();
|
||||||
|
let mut oldest_blob_slot = prev_blob_info
|
||||||
|
.oldest_blob_slot
|
||||||
|
.unwrap_or(Slot::new(u64::MAX));
|
||||||
|
|
||||||
|
let mut num_already_known = 0;
|
||||||
|
let mut num_imported = 0;
|
||||||
|
|
||||||
|
let mut ops = vec![];
|
||||||
|
let batch_size = 1024;
|
||||||
|
|
||||||
|
for res in source_db.iter_column(DBColumn::BeaconBlob) {
|
||||||
|
let (block_root, blob_bytes) = res?;
|
||||||
|
|
||||||
|
if db.get_blobs(&block_root)?.is_some() {
|
||||||
|
num_already_known += 1;
|
||||||
|
} else {
|
||||||
|
let blobs = BlobSidecarList::<E>::from_ssz_bytes(&blob_bytes)?;
|
||||||
|
ops.push(blobs.as_kv_store_op(block_root));
|
||||||
|
|
||||||
|
if let Some(blob) = blobs.first() {
|
||||||
|
oldest_blob_slot = oldest_blob_slot.min(blob.slot());
|
||||||
|
debug!(log, "Imported blobs for slot {}", blob.slot());
|
||||||
|
}
|
||||||
|
num_imported += 1;
|
||||||
|
|
||||||
|
if ops.len() >= batch_size {
|
||||||
|
db.blobs_db.do_atomically(std::mem::take(&mut ops))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
db.blobs_db.do_atomically(ops)?;
|
||||||
|
|
||||||
|
let mut new_blob_info = prev_blob_info.clone();
|
||||||
|
new_blob_info.oldest_blob_slot = Some(oldest_blob_slot);
|
||||||
|
db.compare_and_set_blob_info_with_write(prev_blob_info, new_blob_info)?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
log,
|
||||||
|
"Blobs imported";
|
||||||
|
"imported" => num_imported,
|
||||||
|
"already_known" => num_already_known
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Run the database manager, returning an error string if the operation did not succeed.
|
/// Run the database manager, returning an error string if the operation did not succeed.
|
||||||
pub fn run<E: EthSpec>(
|
pub fn run<E: EthSpec>(
|
||||||
cli_args: &ArgMatches,
|
cli_args: &ArgMatches,
|
||||||
@@ -647,5 +718,8 @@ pub fn run<E: EthSpec>(
|
|||||||
cli::DatabaseManagerSubcommand::InspectBlobs(_) => {
|
cli::DatabaseManagerSubcommand::InspectBlobs(_) => {
|
||||||
inspect_blobs(false, client_config, &context, log).map_err(format_err)
|
inspect_blobs(false, client_config, &context, log).map_err(format_err)
|
||||||
}
|
}
|
||||||
|
cli::DatabaseManagerSubcommand::ImportBlobs(config) => {
|
||||||
|
import_blobs(&config.source_db, client_config, &context, log).map_err(format_err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user