mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 18:21:45 +00:00
Manual compaction endpoint backport (#7104)
Backports: - https://github.com/sigp/lighthouse/pull/7072 To: - https://github.com/sigp/lighthouse/issues/7039 #7103 should be merged first This PR introduces an endpoint that allows users to manually trigger background compaction.
This commit is contained in:
@@ -1711,6 +1711,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn manually_compact_database(&self) {
|
||||
self.store_migrator.process_manual_compaction();
|
||||
}
|
||||
|
||||
pub fn manually_finalize_state(
|
||||
&self,
|
||||
state_root: Hash256,
|
||||
|
||||
@@ -125,6 +125,7 @@ pub enum Notification {
|
||||
Reconstruction,
|
||||
PruneBlobs(Epoch),
|
||||
ManualFinalization(ManualFinalizationNotification),
|
||||
ManualCompaction,
|
||||
}
|
||||
|
||||
pub struct ManualFinalizationNotification {
|
||||
@@ -198,6 +199,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn process_manual_compaction(&self) {
|
||||
if let Some(Notification::ManualCompaction) =
|
||||
self.send_background_notification(Notification::ManualCompaction)
|
||||
{
|
||||
Self::run_manual_compaction(self.db.clone(), &self.log);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn process_manual_finalization(&self, notif: ManualFinalizationNotification) {
|
||||
if let Some(Notification::ManualFinalization(notif)) =
|
||||
self.send_background_notification(Notification::ManualFinalization(notif))
|
||||
@@ -446,6 +455,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
||||
debug!(log, "Database consolidation complete");
|
||||
}
|
||||
|
||||
fn run_manual_compaction(db: Arc<HotColdDB<E, Hot, Cold>>, log: &Logger) {
|
||||
debug!(log, "Running manual compaction");
|
||||
if let Err(e) = db.compact() {
|
||||
warn!(log, "Database compaction failed"; "error" => format!("{:?}", e));
|
||||
} else {
|
||||
debug!(log, "Manual compaction completed");
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a new child thread to run the migration process.
|
||||
///
|
||||
/// Return a channel handle for sending requests to the thread.
|
||||
@@ -460,17 +478,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
||||
let mut reconstruction_notif = None;
|
||||
let mut finalization_notif = None;
|
||||
let mut manual_finalization_notif = None;
|
||||
let mut manual_compaction_notif = None;
|
||||
let mut prune_blobs_notif = None;
|
||||
match notif {
|
||||
Notification::Reconstruction => reconstruction_notif = Some(notif),
|
||||
Notification::Finalization(fin) => finalization_notif = Some(fin),
|
||||
Notification::ManualFinalization(fin) => manual_finalization_notif = Some(fin),
|
||||
Notification::PruneBlobs(dab) => prune_blobs_notif = Some(dab),
|
||||
Notification::ManualCompaction => manual_compaction_notif = Some(notif),
|
||||
}
|
||||
// Read the rest of the messages in the channel, taking the best of each type.
|
||||
for notif in rx.try_iter() {
|
||||
match notif {
|
||||
Notification::Reconstruction => reconstruction_notif = Some(notif),
|
||||
Notification::ManualCompaction => manual_compaction_notif = Some(notif),
|
||||
Notification::ManualFinalization(fin) => {
|
||||
if let Some(current) = manual_finalization_notif.as_mut() {
|
||||
if fin.checkpoint.epoch > current.checkpoint.epoch {
|
||||
@@ -511,6 +532,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
||||
if reconstruction_notif.is_some() {
|
||||
Self::run_reconstruction(db.clone(), Some(inner_tx.clone()), &log);
|
||||
}
|
||||
if manual_compaction_notif.is_some() {
|
||||
Self::run_manual_compaction(db.clone(), &log);
|
||||
}
|
||||
}
|
||||
});
|
||||
(tx, thread)
|
||||
|
||||
@@ -4105,6 +4105,23 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
},
|
||||
);
|
||||
|
||||
// POST lighthouse/compaction
|
||||
let post_lighthouse_compaction = warp::path("lighthouse")
|
||||
.and(warp::path("compaction"))
|
||||
.and(warp::path::end())
|
||||
.and(task_spawner_filter.clone())
|
||||
.and(chain_filter.clone())
|
||||
.then(
|
||||
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|
||||
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||
chain.manually_compact_database();
|
||||
Ok(api_types::GenericResponse::from(String::from(
|
||||
"Triggered manual compaction",
|
||||
)))
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
// POST lighthouse/liveness
|
||||
let post_lighthouse_liveness = warp::path("lighthouse")
|
||||
.and(warp::path("liveness"))
|
||||
@@ -4878,6 +4895,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.uor(post_lighthouse_ui_validator_metrics)
|
||||
.uor(post_lighthouse_ui_validator_info)
|
||||
.uor(post_lighthouse_finalize)
|
||||
.uor(post_lighthouse_compaction)
|
||||
.recover(warp_utils::reject::handle_rejection),
|
||||
),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user