mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-04 05:14:33 +00:00
Manual finalization endpoint (#7059)
* Load block roots from fork choice where possible to avoid loading state from disk when serving block by range requests. * Check if the start slot is newer than finalization (`start_slot >= finalized_slot`), and use fork choice in that case. * force finalization endpoint * cleanup * Remove ds store * Don't import blocks that conflict with the split * Disconnect and ban peer if we get blocks conflicting manual checkpoint * immediately commit state to cold db * revert * Fix descent from split check * Add safety check to checkpoint when doing manual finalization. --------- Co-authored-by: Jimmy Chen <jchen.tc@gmail.com> Co-authored-by: Michael Sproul <michael@sigmaprime.io> Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
@@ -42,7 +42,7 @@ use crate::light_client_optimistic_update_verification::{
|
|||||||
Error as LightClientOptimisticUpdateError, VerifiedLightClientOptimisticUpdate,
|
Error as LightClientOptimisticUpdateError, VerifiedLightClientOptimisticUpdate,
|
||||||
};
|
};
|
||||||
use crate::light_client_server_cache::LightClientServerCache;
|
use crate::light_client_server_cache::LightClientServerCache;
|
||||||
use crate::migrate::BackgroundMigrator;
|
use crate::migrate::{BackgroundMigrator, ManualFinalizationNotification};
|
||||||
use crate::naive_aggregation_pool::{
|
use crate::naive_aggregation_pool::{
|
||||||
AggregatedAttestationMap, Error as NaiveAggregationError, NaiveAggregationPool,
|
AggregatedAttestationMap, Error as NaiveAggregationError, NaiveAggregationPool,
|
||||||
SyncContributionAggregateMap,
|
SyncContributionAggregateMap,
|
||||||
@@ -118,8 +118,8 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
|
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
|
||||||
use store::{
|
use store::{
|
||||||
BlobSidecarListFromRoot, DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore,
|
BlobSidecarListFromRoot, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary,
|
||||||
KeyValueStoreOp, StoreItem, StoreOp,
|
KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
|
||||||
};
|
};
|
||||||
use task_executor::{ShutdownReason, TaskExecutor};
|
use task_executor::{ShutdownReason, TaskExecutor};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
@@ -1707,6 +1707,41 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn manually_finalize_state(
|
||||||
|
&self,
|
||||||
|
state_root: Hash256,
|
||||||
|
checkpoint: Checkpoint,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let HotStateSummary {
|
||||||
|
slot,
|
||||||
|
latest_block_root,
|
||||||
|
..
|
||||||
|
} = self
|
||||||
|
.store
|
||||||
|
.load_hot_state_summary(&state_root)
|
||||||
|
.map_err(BeaconChainError::DBError)?
|
||||||
|
.ok_or(BeaconChainError::MissingHotStateSummary(state_root))?;
|
||||||
|
|
||||||
|
if slot != checkpoint.epoch.start_slot(T::EthSpec::slots_per_epoch())
|
||||||
|
|| latest_block_root != *checkpoint.root
|
||||||
|
{
|
||||||
|
return Err(BeaconChainError::InvalidCheckpoint {
|
||||||
|
state_root,
|
||||||
|
checkpoint,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let notif = ManualFinalizationNotification {
|
||||||
|
state_root: state_root.into(),
|
||||||
|
checkpoint,
|
||||||
|
head_tracker: self.head_tracker.clone(),
|
||||||
|
genesis_block_root: self.genesis_block_root,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.store_migrator.process_manual_finalization(notif);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns an aggregated `Attestation`, if any, that has a matching `attestation.data`.
|
/// Returns an aggregated `Attestation`, if any, that has a matching `attestation.data`.
|
||||||
///
|
///
|
||||||
/// The attestation will be obtained from `self.naive_aggregation_pool`.
|
/// The attestation will be obtained from `self.naive_aggregation_pool`.
|
||||||
|
|||||||
@@ -1782,7 +1782,12 @@ pub fn check_block_is_finalized_checkpoint_or_descendant<
|
|||||||
fork_choice: &BeaconForkChoice<T>,
|
fork_choice: &BeaconForkChoice<T>,
|
||||||
block: B,
|
block: B,
|
||||||
) -> Result<B, BlockError> {
|
) -> Result<B, BlockError> {
|
||||||
if fork_choice.is_finalized_checkpoint_or_descendant(block.parent_root()) {
|
// If we have a split block newer than finalization then we also ban blocks which are not
|
||||||
|
// descended from that split block.
|
||||||
|
let split = chain.store.get_split_info();
|
||||||
|
if fork_choice.is_finalized_checkpoint_or_descendant(block.parent_root())
|
||||||
|
&& fork_choice.is_descendant(split.block_root, block.parent_root())
|
||||||
|
{
|
||||||
Ok(block)
|
Ok(block)
|
||||||
} else {
|
} else {
|
||||||
// If fork choice does *not* consider the parent to be a descendant of the finalized block,
|
// If fork choice does *not* consider the parent to be a descendant of the finalized block,
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ pub enum BeaconChainError {
|
|||||||
ForkChoiceStoreError(ForkChoiceStoreError),
|
ForkChoiceStoreError(ForkChoiceStoreError),
|
||||||
MissingBeaconBlock(Hash256),
|
MissingBeaconBlock(Hash256),
|
||||||
MissingBeaconState(Hash256),
|
MissingBeaconState(Hash256),
|
||||||
|
MissingHotStateSummary(Hash256),
|
||||||
SlotProcessingError(SlotProcessingError),
|
SlotProcessingError(SlotProcessingError),
|
||||||
EpochProcessingError(EpochProcessingError),
|
EpochProcessingError(EpochProcessingError),
|
||||||
StateAdvanceError(StateAdvanceError),
|
StateAdvanceError(StateAdvanceError),
|
||||||
@@ -181,9 +182,9 @@ pub enum BeaconChainError {
|
|||||||
execution_block_hash: Option<ExecutionBlockHash>,
|
execution_block_hash: Option<ExecutionBlockHash>,
|
||||||
},
|
},
|
||||||
ForkchoiceUpdate(execution_layer::Error),
|
ForkchoiceUpdate(execution_layer::Error),
|
||||||
FinalizedCheckpointMismatch {
|
InvalidCheckpoint {
|
||||||
head_state: Checkpoint,
|
state_root: Hash256,
|
||||||
fork_choice: Hash256,
|
checkpoint: Checkpoint,
|
||||||
},
|
},
|
||||||
InvalidSlot(Slot),
|
InvalidSlot(Slot),
|
||||||
HeadBlockNotFullyVerified {
|
HeadBlockNotFullyVerified {
|
||||||
|
|||||||
@@ -124,14 +124,22 @@ pub enum Notification {
|
|||||||
Finalization(FinalizationNotification),
|
Finalization(FinalizationNotification),
|
||||||
Reconstruction,
|
Reconstruction,
|
||||||
PruneBlobs(Epoch),
|
PruneBlobs(Epoch),
|
||||||
|
ManualFinalization(ManualFinalizationNotification),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ManualFinalizationNotification {
|
||||||
|
pub state_root: BeaconStateHash,
|
||||||
|
pub checkpoint: Checkpoint,
|
||||||
|
pub head_tracker: Arc<HeadTracker>,
|
||||||
|
pub genesis_block_root: Hash256,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FinalizationNotification {
|
pub struct FinalizationNotification {
|
||||||
finalized_state_root: BeaconStateHash,
|
pub finalized_state_root: BeaconStateHash,
|
||||||
finalized_checkpoint: Checkpoint,
|
pub finalized_checkpoint: Checkpoint,
|
||||||
head_tracker: Arc<HeadTracker>,
|
pub head_tracker: Arc<HeadTracker>,
|
||||||
prev_migration: Arc<Mutex<PrevMigration>>,
|
pub prev_migration: Arc<Mutex<PrevMigration>>,
|
||||||
genesis_block_root: Hash256,
|
pub genesis_block_root: Hash256,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> {
|
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> {
|
||||||
@@ -190,6 +198,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn process_manual_finalization(&self, notif: ManualFinalizationNotification) {
|
||||||
|
if let Some(Notification::ManualFinalization(notif)) =
|
||||||
|
self.send_background_notification(Notification::ManualFinalization(notif))
|
||||||
|
{
|
||||||
|
Self::run_manual_migration(self.db.clone(), notif, &self.log);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn process_reconstruction(&self) {
|
pub fn process_reconstruction(&self) {
|
||||||
if let Some(Notification::Reconstruction) =
|
if let Some(Notification::Reconstruction) =
|
||||||
self.send_background_notification(Notification::Reconstruction)
|
self.send_background_notification(Notification::Reconstruction)
|
||||||
@@ -289,6 +305,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn run_manual_migration(
|
||||||
|
db: Arc<HotColdDB<E, Hot, Cold>>,
|
||||||
|
notif: ManualFinalizationNotification,
|
||||||
|
log: &Logger,
|
||||||
|
) {
|
||||||
|
// We create a "dummy" prev migration
|
||||||
|
let prev_migration = PrevMigration {
|
||||||
|
epoch: Epoch::new(1),
|
||||||
|
epochs_per_migration: 2,
|
||||||
|
};
|
||||||
|
let notif = FinalizationNotification {
|
||||||
|
finalized_state_root: notif.state_root,
|
||||||
|
finalized_checkpoint: notif.checkpoint,
|
||||||
|
head_tracker: notif.head_tracker,
|
||||||
|
prev_migration: Arc::new(prev_migration.into()),
|
||||||
|
genesis_block_root: notif.genesis_block_root,
|
||||||
|
};
|
||||||
|
Self::run_migration(db, notif, log);
|
||||||
|
}
|
||||||
|
|
||||||
/// Perform the actual work of `process_finalization`.
|
/// Perform the actual work of `process_finalization`.
|
||||||
fn run_migration(
|
fn run_migration(
|
||||||
db: Arc<HotColdDB<E, Hot, Cold>>,
|
db: Arc<HotColdDB<E, Hot, Cold>>,
|
||||||
@@ -422,16 +458,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
|||||||
while let Ok(notif) = rx.recv() {
|
while let Ok(notif) = rx.recv() {
|
||||||
let mut reconstruction_notif = None;
|
let mut reconstruction_notif = None;
|
||||||
let mut finalization_notif = None;
|
let mut finalization_notif = None;
|
||||||
|
let mut manual_finalization_notif = None;
|
||||||
let mut prune_blobs_notif = None;
|
let mut prune_blobs_notif = None;
|
||||||
match notif {
|
match notif {
|
||||||
Notification::Reconstruction => reconstruction_notif = Some(notif),
|
Notification::Reconstruction => reconstruction_notif = Some(notif),
|
||||||
Notification::Finalization(fin) => finalization_notif = Some(fin),
|
Notification::Finalization(fin) => finalization_notif = Some(fin),
|
||||||
|
Notification::ManualFinalization(fin) => manual_finalization_notif = Some(fin),
|
||||||
Notification::PruneBlobs(dab) => prune_blobs_notif = Some(dab),
|
Notification::PruneBlobs(dab) => prune_blobs_notif = Some(dab),
|
||||||
}
|
}
|
||||||
// Read the rest of the messages in the channel, taking the best of each type.
|
// Read the rest of the messages in the channel, taking the best of each type.
|
||||||
for notif in rx.try_iter() {
|
for notif in rx.try_iter() {
|
||||||
match notif {
|
match notif {
|
||||||
Notification::Reconstruction => reconstruction_notif = Some(notif),
|
Notification::Reconstruction => reconstruction_notif = Some(notif),
|
||||||
|
Notification::ManualFinalization(fin) => {
|
||||||
|
if let Some(current) = manual_finalization_notif.as_mut() {
|
||||||
|
if fin.checkpoint.epoch > current.checkpoint.epoch {
|
||||||
|
*current = fin;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
manual_finalization_notif = Some(fin);
|
||||||
|
}
|
||||||
|
}
|
||||||
Notification::Finalization(fin) => {
|
Notification::Finalization(fin) => {
|
||||||
if let Some(current) = finalization_notif.as_mut() {
|
if let Some(current) = finalization_notif.as_mut() {
|
||||||
if fin.finalized_checkpoint.epoch
|
if fin.finalized_checkpoint.epoch
|
||||||
@@ -454,6 +501,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
|||||||
if let Some(fin) = finalization_notif {
|
if let Some(fin) = finalization_notif {
|
||||||
Self::run_migration(db.clone(), fin, &log);
|
Self::run_migration(db.clone(), fin, &log);
|
||||||
}
|
}
|
||||||
|
if let Some(fin) = manual_finalization_notif {
|
||||||
|
Self::run_manual_migration(db.clone(), fin, &log);
|
||||||
|
}
|
||||||
if let Some(dab) = prune_blobs_notif {
|
if let Some(dab) = prune_blobs_notif {
|
||||||
Self::run_prune_blobs(db.clone(), dab, &log);
|
Self::run_prune_blobs(db.clone(), dab, &log);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -86,11 +86,11 @@ use tokio_stream::{
|
|||||||
};
|
};
|
||||||
use types::{
|
use types::{
|
||||||
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
|
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
|
||||||
AttesterSlashing, BeaconStateError, ChainSpec, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
|
AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset,
|
||||||
ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
|
Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData,
|
||||||
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
|
ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock,
|
||||||
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
|
SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData,
|
||||||
SyncCommitteeMessage, SyncContributionData,
|
SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData,
|
||||||
};
|
};
|
||||||
use validator::pubkey_to_validator_index;
|
use validator::pubkey_to_validator_index;
|
||||||
use version::{
|
use version::{
|
||||||
@@ -4133,6 +4133,35 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// POST lighthouse/finalize
|
||||||
|
let post_lighthouse_finalize = warp::path("lighthouse")
|
||||||
|
.and(warp::path("finalize"))
|
||||||
|
.and(warp::path::end())
|
||||||
|
.and(warp_utils::json::json())
|
||||||
|
.and(task_spawner_filter.clone())
|
||||||
|
.and(chain_filter.clone())
|
||||||
|
.then(
|
||||||
|
|request_data: api_types::ManualFinalizationRequestData,
|
||||||
|
task_spawner: TaskSpawner<T::EthSpec>,
|
||||||
|
chain: Arc<BeaconChain<T>>| {
|
||||||
|
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||||
|
let checkpoint = Checkpoint {
|
||||||
|
epoch: request_data.epoch,
|
||||||
|
root: request_data.block_root,
|
||||||
|
};
|
||||||
|
|
||||||
|
chain
|
||||||
|
.manually_finalize_state(request_data.state_root, checkpoint)
|
||||||
|
.map(|_| api_types::GenericResponse::from(request_data))
|
||||||
|
.map_err(|e| {
|
||||||
|
warp_utils::reject::custom_bad_request(format!(
|
||||||
|
"Failed to finalize state due to error: {e:?}"
|
||||||
|
))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
// POST lighthouse/liveness
|
// POST lighthouse/liveness
|
||||||
let post_lighthouse_liveness = warp::path("lighthouse")
|
let post_lighthouse_liveness = warp::path("lighthouse")
|
||||||
.and(warp::path("liveness"))
|
.and(warp::path("liveness"))
|
||||||
@@ -4932,6 +4961,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.uor(post_lighthouse_block_rewards)
|
.uor(post_lighthouse_block_rewards)
|
||||||
.uor(post_lighthouse_ui_validator_metrics)
|
.uor(post_lighthouse_ui_validator_metrics)
|
||||||
.uor(post_lighthouse_ui_validator_info)
|
.uor(post_lighthouse_ui_validator_info)
|
||||||
|
.uor(post_lighthouse_finalize)
|
||||||
.recover(warp_utils::reject::handle_rejection),
|
.recover(warp_utils::reject::handle_rejection),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -740,6 +740,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
debug!(self.log, "Finalized or earlier block processed";);
|
debug!(self.log, "Finalized or earlier block processed";);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
BlockError::NotFinalizedDescendant { block_parent_root } => {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Not syncing to a chain that conflicts with the canonical or manual finalized checkpoint"
|
||||||
|
);
|
||||||
|
Err(ChainSegmentFailed {
|
||||||
|
message: format!(
|
||||||
|
"Block with parent_root {} conflicts with our checkpoint state",
|
||||||
|
block_parent_root
|
||||||
|
),
|
||||||
|
peer_action: Some(PeerAction::Fatal),
|
||||||
|
})
|
||||||
|
}
|
||||||
BlockError::GenesisBlock => {
|
BlockError::GenesisBlock => {
|
||||||
debug!(self.log, "Genesis block was processed");
|
debug!(self.log, "Genesis block was processed");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -1424,6 +1424,13 @@ pub struct StandardLivenessResponseData {
|
|||||||
pub is_live: bool,
|
pub is_live: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct ManualFinalizationRequestData {
|
||||||
|
pub state_root: Hash256,
|
||||||
|
pub epoch: Epoch,
|
||||||
|
pub block_root: Hash256,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct LivenessRequestData {
|
pub struct LivenessRequestData {
|
||||||
pub epoch: Epoch,
|
pub epoch: Epoch,
|
||||||
|
|||||||
@@ -1255,6 +1255,11 @@ where
|
|||||||
.is_finalized_checkpoint_or_descendant::<E>(block_root)
|
.is_finalized_checkpoint_or_descendant::<E>(block_root)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_descendant(&self, ancestor_root: Hash256, descendant_root: Hash256) -> bool {
|
||||||
|
self.proto_array
|
||||||
|
.is_descendant(ancestor_root, descendant_root)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns `Ok(true)` if `block_root` has been imported optimistically or deemed invalid.
|
/// Returns `Ok(true)` if `block_root` has been imported optimistically or deemed invalid.
|
||||||
///
|
///
|
||||||
/// Returns `Ok(false)` if `block_root`'s execution payload has been elected as fully VALID, if
|
/// Returns `Ok(false)` if `block_root`'s execution payload has been elected as fully VALID, if
|
||||||
|
|||||||
Reference in New Issue
Block a user