From 299c0b2d17d1393cc8bfb9a35fb796aae2380387 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 7 Nov 2024 12:10:43 +1100 Subject: [PATCH] Implement import-blobs --- database_manager/Cargo.toml | 1 + database_manager/src/cli.rs | 10 ++++- database_manager/src/lib.rs | 82 +++++++++++++++++++++++++++++++++++-- 3 files changed, 88 insertions(+), 5 deletions(-) diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml index 96176f3fba..f97aa50ee5 100644 --- a/database_manager/Cargo.toml +++ b/database_manager/Cargo.toml @@ -15,3 +15,4 @@ types = { workspace = true } slog = { workspace = true } strum = { workspace = true } serde = { workspace = true } +ethereum_ssz = { workspace = true } diff --git a/database_manager/src/cli.rs b/database_manager/src/cli.rs index 864fa244eb..7ef978fd3e 100644 --- a/database_manager/src/cli.rs +++ b/database_manager/src/cli.rs @@ -82,6 +82,7 @@ pub enum DatabaseManagerSubcommand { Compact(Compact), SetOldestBlobSlot(SetOldestBlobSlot), InspectBlobs(InspectBlobs), + ImportBlobs(ImportBlobs), } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] @@ -244,8 +245,15 @@ pub struct SetOldestBlobSlot { } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] -#[clap(about = "Produce a summary of blob availability in the databasue.")] +#[clap(about = "Produce a summary of blob availability in the database.")] pub struct InspectBlobs { #[clap(long, help = "Verify blob data integrity.", display_order = 0)] pub verify: bool, } + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Import blobs from another node's blob database.")] +pub struct ImportBlobs { + #[clap(long, help = "Path to the database to import", display_order = 0)] + pub source_db: PathBuf, +} diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 003862d723..4e25fb1523 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -12,18 +12,19 @@ use clap::ValueEnum; use cli::{Compact, Inspect}; use environment::{Environment, RuntimeContext}; use serde::{Deserialize, Serialize}; -use slog::{info, warn, Logger}; +use slog::{debug, info, warn, Logger}; +use ssz::Decode; use std::fs; use std::io::Write; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN; use store::{ errors::Error, metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, - DBColumn, HotColdDB, KeyValueStore, LevelDB, + DBColumn, HotColdDB, KeyValueStore, LevelDB, StoreItem, }; use strum::{EnumString, EnumVariantNames}; -use types::{BeaconState, EthSpec, Hash256, Slot}; +use types::{BeaconState, BlobSidecarList, EthSpec, Hash256, Slot}; fn parse_client_config( cli_args: &ArgMatches, @@ -583,6 +584,76 @@ fn inspect_blobs( Ok(()) } +fn import_blobs( + source_path: &Path, + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log.clone(), + )?; + + let source_db = LevelDB::::open(source_path)?; + + let prev_blob_info = db.get_blob_info(); + let mut oldest_blob_slot = prev_blob_info + .oldest_blob_slot + .unwrap_or(Slot::new(u64::MAX)); + + let mut num_already_known = 0; + let mut num_imported = 0; + + let mut ops = vec![]; + let batch_size = 1024; + + for res in source_db.iter_column(DBColumn::BeaconBlob) { + let (block_root, blob_bytes) = res?; + + if db.get_blobs(&block_root)?.is_some() { + num_already_known += 1; + } else { + let blobs = BlobSidecarList::::from_ssz_bytes(&blob_bytes)?; + ops.push(blobs.as_kv_store_op(block_root)); + + if let Some(blob) = blobs.first() { + oldest_blob_slot = oldest_blob_slot.min(blob.slot()); + debug!(log, "Imported blobs for slot {}", blob.slot()); + } + num_imported += 1; + + if ops.len() >= batch_size { + db.blobs_db.do_atomically(std::mem::take(&mut ops))?; + } + } + } + db.blobs_db.do_atomically(ops)?; + + let mut new_blob_info = prev_blob_info.clone(); + new_blob_info.oldest_blob_slot = Some(oldest_blob_slot); + db.compare_and_set_blob_info_with_write(prev_blob_info, new_blob_info)?; + + info!( + log, + "Blobs imported"; + "imported" => num_imported, + "already_known" => num_already_known + ); + + Ok(()) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run( cli_args: &ArgMatches, @@ -647,5 +718,8 @@ pub fn run( cli::DatabaseManagerSubcommand::InspectBlobs(_) => { inspect_blobs(false, client_config, &context, log).map_err(format_err) } + cli::DatabaseManagerSubcommand::ImportBlobs(config) => { + import_blobs(&config.source_db, client_config, &context, log).map_err(format_err) + } } }