mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-20 13:24:44 +00:00
Make slashing protection import more resilient (#2598)
## Issue Addressed Closes #2419 ## Proposed Changes Address a long-standing issue with the import of slashing protection data where the import would fail due to the data appearing slashable w.r.t the existing database. Importing is now idempotent, and will have no issues importing data that has been handed back and forth between different validator clients, or different implementations. The implementation works by updating the high and low watermarks if they need updating, and not attempting to check if the input is slashable w.r.t itself or the database. This is a strengthening of the minification that we started to do by default since #2380, and what Teku has been doing since the beginning. ## Additional Info The only feature we lose by doing this is the ability to do non-minified imports of clock drifted messages (cf. Prysm on Medalla). In theory, with the previous implementation we could import all the messages in case of clock drift and be aware of the "gap" between the real present time and the messages signed in the far future. _However_ for attestations this is close to useless, as the source epoch will advance as soon as justification occurs, which will require us to make slashable attestations with respect to our bogus attestation(s). E.g. if I sign an attestation 100=>200 when the current epoch is 101, then I won't be able to vote in any epochs prior to 101 becoming justified because 101=>102, 101=>103, etc are all surrounded by 100=>200. Seeing as signing attestations gets blocked almost immediately in this case regardless of our import behaviour, there's no point trying to handle it. For blocks the situation is more hopeful due to the lack of surrounds, but losing block proposals from validators who by definition can't attest doesn't seem like an issue (the other block proposers can pick up the slack).
This commit is contained in:
@@ -4,7 +4,7 @@ use crate::interchange::{
|
||||
};
|
||||
use crate::signed_attestation::InvalidAttestation;
|
||||
use crate::signed_block::InvalidBlock;
|
||||
use crate::{hash256_from_row, NotSafe, Safe, SignedAttestation, SignedBlock, SigningRoot};
|
||||
use crate::{signing_root_from_row, NotSafe, Safe, SignedAttestation, SignedBlock, SigningRoot};
|
||||
use filesystem::restrict_file_permissions;
|
||||
use r2d2_sqlite::SqliteConnectionManager;
|
||||
use rusqlite::{params, OptionalExtension, Transaction, TransactionBehavior};
|
||||
@@ -403,7 +403,7 @@ impl SlashingDatabase {
|
||||
txn.execute(
|
||||
"INSERT INTO signed_blocks (validator_id, slot, signing_root)
|
||||
VALUES (?1, ?2, ?3)",
|
||||
params![validator_id, slot, signing_root.to_hash256().as_bytes()],
|
||||
params![validator_id, slot, signing_root.to_hash256_raw().as_bytes()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -429,7 +429,7 @@ impl SlashingDatabase {
|
||||
validator_id,
|
||||
att_source_epoch,
|
||||
att_target_epoch,
|
||||
att_signing_root.to_hash256().as_bytes()
|
||||
att_signing_root.to_hash256_raw().as_bytes()
|
||||
],
|
||||
)?;
|
||||
Ok(())
|
||||
@@ -612,65 +612,86 @@ impl SlashingDatabase {
|
||||
|
||||
pub fn import_interchange_record(
|
||||
&self,
|
||||
mut record: InterchangeData,
|
||||
record: InterchangeData,
|
||||
txn: &Transaction,
|
||||
) -> Result<ValidatorSummary, NotSafe> {
|
||||
self.register_validators_in_txn(std::iter::once(&record.pubkey), txn)?;
|
||||
let pubkey = &record.pubkey;
|
||||
|
||||
// Insert all signed blocks, sorting them so that the minimum bounds are not
|
||||
// violated by blocks earlier in the file.
|
||||
record.signed_blocks.sort_unstable_by_key(|b| b.slot);
|
||||
for block in &record.signed_blocks {
|
||||
self.check_and_insert_block_signing_root_txn(
|
||||
&record.pubkey,
|
||||
block.slot,
|
||||
block
|
||||
.signing_root
|
||||
.map(SigningRoot::from)
|
||||
.unwrap_or_default(),
|
||||
txn,
|
||||
)?;
|
||||
self.register_validators_in_txn(std::iter::once(pubkey), txn)?;
|
||||
|
||||
// Summary of minimum and maximum messages pre-import.
|
||||
let prev_summary = self.validator_summary(pubkey, txn)?;
|
||||
|
||||
// If the interchange contains a new maximum slot block, import it.
|
||||
let max_block = record.signed_blocks.iter().max_by_key(|b| b.slot);
|
||||
|
||||
if let Some(max_block) = max_block {
|
||||
// Block is relevant if there are no previous blocks, or new block has slot greater than
|
||||
// previous maximum.
|
||||
if prev_summary
|
||||
.max_block_slot
|
||||
.map_or(true, |max_block_slot| max_block.slot > max_block_slot)
|
||||
{
|
||||
self.insert_block_proposal(
|
||||
txn,
|
||||
pubkey,
|
||||
max_block.slot,
|
||||
max_block
|
||||
.signing_root
|
||||
.map(SigningRoot::from)
|
||||
.unwrap_or_default(),
|
||||
)?;
|
||||
|
||||
// Prune the database so that it contains *only* the new block.
|
||||
self.prune_signed_blocks(&record.pubkey, max_block.slot, txn)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Prune blocks less than the min slot from this interchange file.
|
||||
// This ensures we don't sign anything less than the min slot after successful import,
|
||||
// which is signficant if we have imported two files with a "gap" in between.
|
||||
if let Some(new_min_slot) = record.signed_blocks.iter().map(|block| block.slot).min() {
|
||||
self.prune_signed_blocks(&record.pubkey, new_min_slot, txn)?;
|
||||
}
|
||||
|
||||
// Insert all signed attestations.
|
||||
record
|
||||
.signed_attestations
|
||||
.sort_unstable_by_key(|att| (att.source_epoch, att.target_epoch));
|
||||
for attestation in &record.signed_attestations {
|
||||
self.check_and_insert_attestation_signing_root_txn(
|
||||
&record.pubkey,
|
||||
attestation.source_epoch,
|
||||
attestation.target_epoch,
|
||||
attestation
|
||||
.signing_root
|
||||
.map(SigningRoot::from)
|
||||
.unwrap_or_default(),
|
||||
txn,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Prune attestations less than the min target from this interchange file.
|
||||
// See the rationale for blocks above, and the doc comment for `prune_signed_attestations`
|
||||
// for why we don't need to separately prune for the min source.
|
||||
if let Some(new_min_target) = record
|
||||
// Find the attestations with max source and max target. Unless the input contains slashable
|
||||
// data these two attestations should be identical, but we also handle the case where they
|
||||
// are not.
|
||||
let max_source_attestation = record
|
||||
.signed_attestations
|
||||
.iter()
|
||||
.map(|attestation| attestation.target_epoch)
|
||||
.min()
|
||||
.max_by_key(|att| att.source_epoch);
|
||||
let max_target_attestation = record
|
||||
.signed_attestations
|
||||
.iter()
|
||||
.max_by_key(|att| att.target_epoch);
|
||||
|
||||
if let (Some(max_source_att), Some(max_target_att)) =
|
||||
(max_source_attestation, max_target_attestation)
|
||||
{
|
||||
self.prune_signed_attestations(&record.pubkey, new_min_target, txn)?;
|
||||
let source_epoch = max_or(
|
||||
prev_summary.max_attestation_source,
|
||||
max_source_att.source_epoch,
|
||||
);
|
||||
let target_epoch = max_or(
|
||||
prev_summary.max_attestation_target,
|
||||
max_target_att.target_epoch,
|
||||
);
|
||||
let signing_root = SigningRoot::default();
|
||||
|
||||
// Clear existing attestations before insert to avoid running afoul of the target epoch
|
||||
// uniqueness constraint.
|
||||
self.clear_signed_attestations(pubkey, txn)?;
|
||||
self.insert_attestation(txn, pubkey, source_epoch, target_epoch, signing_root)?;
|
||||
}
|
||||
|
||||
let summary = self.validator_summary(&record.pubkey, txn)?;
|
||||
|
||||
Ok(summary)
|
||||
// Check that the summary is consistent with having added the new data.
|
||||
if summary.check_block_consistency(&prev_summary, !record.signed_blocks.is_empty())
|
||||
&& summary.check_attestation_consistency(
|
||||
&prev_summary,
|
||||
!record.signed_attestations.is_empty(),
|
||||
)
|
||||
{
|
||||
Ok(summary)
|
||||
} else {
|
||||
// This should never occur and is indicative of a bug in the import code.
|
||||
Err(NotSafe::ConsistencyError)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn export_interchange_info(
|
||||
@@ -695,7 +716,7 @@ impl SlashingDatabase {
|
||||
.query_and_then(params![], |row| {
|
||||
let validator_pubkey: String = row.get(0)?;
|
||||
let slot = row.get(1)?;
|
||||
let signing_root = Some(hash256_from_row(2, row)?);
|
||||
let signing_root = signing_root_from_row(2, row)?.to_hash256();
|
||||
let signed_block = InterchangeBlock { slot, signing_root };
|
||||
data.entry(validator_pubkey)
|
||||
.or_insert_with(|| (vec![], vec![]))
|
||||
@@ -715,7 +736,7 @@ impl SlashingDatabase {
|
||||
let validator_pubkey: String = row.get(0)?;
|
||||
let source_epoch = row.get(1)?;
|
||||
let target_epoch = row.get(2)?;
|
||||
let signing_root = Some(hash256_from_row(3, row)?);
|
||||
let signing_root = signing_root_from_row(3, row)?.to_hash256();
|
||||
let signed_attestation = InterchangeAttestation {
|
||||
source_epoch,
|
||||
target_epoch,
|
||||
@@ -786,12 +807,6 @@ impl SlashingDatabase {
|
||||
|
||||
/// Remove all attestations for `public_key` with `target < new_min_target`.
|
||||
///
|
||||
/// Pruning every attestation with target less than `new_min_target` also has the effect of
|
||||
/// making the new minimum source the source of the attestation with `target == new_min_target`
|
||||
/// (if any exists). This is exactly what's required for pruning after importing an interchange
|
||||
/// file, whereby we want to update the new minimum source to the min source from the
|
||||
/// interchange.
|
||||
///
|
||||
/// If the `new_min_target` was plucked out of thin air and doesn't necessarily correspond to
|
||||
/// an extant attestation then this function is still safe. It will never delete *all* the
|
||||
/// attestations in the database.
|
||||
@@ -803,7 +818,7 @@ impl SlashingDatabase {
|
||||
) -> Result<(), NotSafe> {
|
||||
let validator_id = self.get_validator_id_in_txn(txn, public_key)?;
|
||||
|
||||
// The following holds:
|
||||
// The following holds, because we never store mutually slashable attestations:
|
||||
// a.target < new_min_target --> a.source <= new_min_source
|
||||
//
|
||||
// The `MAX(target_epoch)` acts as a guard to prevent accidentally clearing the DB.
|
||||
@@ -821,6 +836,25 @@ impl SlashingDatabase {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove all attestations signed by a given `public_key`.
|
||||
///
|
||||
/// This function is incredibly dangerous and should be used with extreme caution. Presently
|
||||
/// we only use it one place: immediately before inserting a new maximum source/maximum target
|
||||
/// attestation. Any future use should take care to respect the database's non-emptiness.
|
||||
fn clear_signed_attestations(
|
||||
&self,
|
||||
public_key: &PublicKeyBytes,
|
||||
txn: &Transaction,
|
||||
) -> Result<(), NotSafe> {
|
||||
let validator_id = self.get_validator_id_in_txn(txn, public_key)?;
|
||||
|
||||
txn.execute(
|
||||
"DELETE FROM signed_attestations WHERE validator_id = ?1",
|
||||
params![validator_id],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prune the signed attestations table for the given validator keys.
|
||||
pub fn prune_all_signed_attestations<'a>(
|
||||
&self,
|
||||
@@ -844,7 +878,7 @@ impl SlashingDatabase {
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Get a summary of a validator's slashing protection data for consumption by the user.
|
||||
/// Get a summary of a validator's slashing protection data including minimums and maximums.
|
||||
pub fn validator_summary(
|
||||
&self,
|
||||
public_key: &PublicKeyBytes,
|
||||
@@ -896,6 +930,51 @@ pub struct ValidatorSummary {
|
||||
pub max_attestation_target: Option<Epoch>,
|
||||
}
|
||||
|
||||
impl ValidatorSummary {
|
||||
fn check_block_consistency(&self, prev: &Self, imported_blocks: bool) -> bool {
|
||||
if imported_blocks {
|
||||
// Max block slot should be monotonically increasing and non-null.
|
||||
// Minimum should match maximum due to pruning.
|
||||
monotonic(self.max_block_slot, prev.max_block_slot)
|
||||
&& self.min_block_slot == self.max_block_slot
|
||||
} else {
|
||||
// Block slots should be unchanged.
|
||||
prev.min_block_slot == self.min_block_slot && prev.max_block_slot == self.max_block_slot
|
||||
}
|
||||
}
|
||||
|
||||
fn check_attestation_consistency(&self, prev: &Self, imported_attestations: bool) -> bool {
|
||||
if imported_attestations {
|
||||
// Max source and target epochs should be monotically increasing and non-null.
|
||||
// Minimums should match maximums due to pruning.
|
||||
monotonic(self.max_attestation_source, prev.max_attestation_source)
|
||||
&& monotonic(self.max_attestation_target, prev.max_attestation_target)
|
||||
&& self.min_attestation_source == self.max_attestation_source
|
||||
&& self.min_attestation_target == self.max_attestation_target
|
||||
} else {
|
||||
// Attestation epochs should be unchanged.
|
||||
self.min_attestation_source == prev.min_attestation_source
|
||||
&& self.max_attestation_source == prev.max_attestation_source
|
||||
&& self.min_attestation_target == prev.min_attestation_target
|
||||
&& self.max_attestation_target == prev.max_attestation_target
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Take the maximum of `opt_x` and `y`, returning `y` if `opt_x` is `None`.
|
||||
fn max_or<T: Copy + Ord>(opt_x: Option<T>, y: T) -> T {
|
||||
opt_x.map_or(y, |x| std::cmp::max(x, y))
|
||||
}
|
||||
|
||||
/// Check that `new` is `Some` and greater than or equal to prev.
|
||||
///
|
||||
/// If prev is `None` and `new` is `Some` then `true` is returned.
|
||||
fn monotonic<T: PartialOrd>(new: Option<T>, prev: Option<T>) -> bool {
|
||||
new.map_or(false, |new_val| {
|
||||
prev.map_or(true, |prev_val| new_val >= prev_val)
|
||||
})
|
||||
}
|
||||
|
||||
/// The result of importing a single entry from an interchange file.
|
||||
#[derive(Debug)]
|
||||
pub enum InterchangeImportOutcome {
|
||||
@@ -922,13 +1001,13 @@ pub enum InterchangeError {
|
||||
interchange_file: Hash256,
|
||||
client: Hash256,
|
||||
},
|
||||
MinAndMaxInconsistent,
|
||||
MaxInconsistent,
|
||||
SummaryInconsistent,
|
||||
SQLError(String),
|
||||
SQLPoolError(r2d2::Error),
|
||||
SerdeJsonError(serde_json::Error),
|
||||
InvalidPubkey(String),
|
||||
NotSafe(NotSafe),
|
||||
/// One or more records were found to be slashable, so the whole batch was aborted.
|
||||
AtomicBatchAborted(Vec<InterchangeImportOutcome>),
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user