Files
lighthouse/validator_client/slashing_protection/src/test_utils.rs
Michael Sproul 0f57fc9d8e Check slashability of attestations in batches to avoid sequential bottleneck (#8516)
Closes:

- https://github.com/sigp/lighthouse/issues/1914


  Sign attestations prior to checking them against the slashing protection DB. This allows us to avoid the sequential DB checks which are observed in traces here:

- https://github.com/sigp/lighthouse/pull/8508#discussion_r2576686107


Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>

Co-Authored-By: Michael Sproul <michael@sigmaprime.io>

Co-Authored-By: Michael Sproul <michaelsproul@users.noreply.github.com>
2026-01-27 07:56:09 +00:00

216 lines
6.6 KiB
Rust

use crate::slashing_database::CheckSlashability;
use crate::*;
use tempfile::{TempDir, tempdir};
use types::{AttestationData, BeaconBlockHeader, test_utils::generate_deterministic_keypair};
pub const DEFAULT_VALIDATOR_INDEX: usize = 0;
pub const DEFAULT_DOMAIN: Hash256 = Hash256::ZERO;
pub const DEFAULT_GENESIS_VALIDATORS_ROOT: Hash256 = Hash256::ZERO;
pub fn pubkey(index: usize) -> PublicKeyBytes {
generate_deterministic_keypair(index).pk.compress()
}
pub struct Test<T> {
pubkey: PublicKeyBytes,
data: T,
domain: Hash256,
expected: Result<Safe, NotSafe>,
}
impl<T> Test<T> {
pub fn single(data: T) -> Self {
Self::with_pubkey(pubkey(DEFAULT_VALIDATOR_INDEX), data)
}
pub fn with_pubkey(pubkey: PublicKeyBytes, data: T) -> Self {
Self {
pubkey,
data,
domain: DEFAULT_DOMAIN,
expected: Ok(Safe::Valid),
}
}
pub fn with_domain(mut self, domain: Hash256) -> Self {
self.domain = domain;
self
}
pub fn expect_result(mut self, result: Result<Safe, NotSafe>) -> Self {
self.expected = result;
self
}
pub fn expect_invalid_att(self, error: InvalidAttestation) -> Self {
self.expect_result(Err(NotSafe::InvalidAttestation(error)))
}
pub fn expect_invalid_block(self, error: InvalidBlock) -> Self {
self.expect_result(Err(NotSafe::InvalidBlock(error)))
}
pub fn expect_same_data(self) -> Self {
self.expect_result(Ok(Safe::SameData))
}
}
pub struct StreamTest<T> {
/// Validators to register.
pub registered_validators: Vec<PublicKeyBytes>,
/// Vector of cases and the value expected when calling `check_and_insert_X`.
pub cases: Vec<Test<T>>,
}
impl<T> Default for StreamTest<T> {
fn default() -> Self {
Self {
registered_validators: vec![pubkey(DEFAULT_VALIDATOR_INDEX)],
cases: vec![],
}
}
}
impl StreamTest<AttestationData> {
pub fn run(&self) {
self.run_solo();
self.run_batched();
}
// Run the test with every attestation processed individually.
pub fn run_solo(&self) {
let dir = tempdir().unwrap();
let slashing_db_file = dir.path().join("slashing_protection.sqlite");
let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap();
for pubkey in &self.registered_validators {
slashing_db.register_validator(*pubkey).unwrap();
}
check_registration_invariants(&slashing_db, &self.registered_validators);
for (i, test) in self.cases.iter().enumerate() {
assert_eq!(
slashing_db.with_transaction(|txn| slashing_db.check_and_insert_attestation(
&test.pubkey,
&test.data,
test.domain,
txn
)),
test.expected,
"attestation {} not processed as expected",
i
);
}
roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty());
}
// Run the test with all attestations processed by the slashing DB as part of a batch.
pub fn run_batched(&self) {
let dir = tempdir().unwrap();
let slashing_db_file = dir.path().join("slashing_protection.sqlite");
let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap();
for pubkey in &self.registered_validators {
slashing_db.register_validator(*pubkey).unwrap();
}
check_registration_invariants(&slashing_db, &self.registered_validators);
let attestations_to_check = self
.cases
.iter()
.map(|test| {
(
&test.data,
&test.pubkey,
test.domain,
CheckSlashability::Yes,
)
})
.collect::<Vec<_>>();
let results = slashing_db
.check_and_insert_attestations(&attestations_to_check)
.unwrap();
assert_eq!(results.len(), self.cases.len());
for ((i, test), result) in self.cases.iter().enumerate().zip(results) {
assert_eq!(
result, test.expected,
"attestation {} not processed as expected",
i
);
}
roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty());
}
}
impl StreamTest<BeaconBlockHeader> {
pub fn run(&self) {
let dir = tempdir().unwrap();
let slashing_db_file = dir.path().join("slashing_protection.sqlite");
let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap();
for pubkey in &self.registered_validators {
slashing_db.register_validator(*pubkey).unwrap();
}
check_registration_invariants(&slashing_db, &self.registered_validators);
for (i, test) in self.cases.iter().enumerate() {
assert_eq!(
slashing_db.check_and_insert_block_proposal(&test.pubkey, &test.data, test.domain),
test.expected,
"attestation {} not processed as expected",
i
);
}
roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty());
}
}
// This function roundtrips the database, but applies minification in order to be compatible with
// the implicit minification done on import.
fn roundtrip_database(dir: &TempDir, db: &SlashingDatabase, is_empty: bool) {
let exported = db
.export_all_interchange_info(DEFAULT_GENESIS_VALIDATORS_ROOT)
.unwrap();
let new_db =
SlashingDatabase::create(&dir.path().join("roundtrip_slashing_protection.sqlite")).unwrap();
new_db
.import_interchange_info(exported.clone(), DEFAULT_GENESIS_VALIDATORS_ROOT)
.unwrap();
let reexported = new_db
.export_all_interchange_info(DEFAULT_GENESIS_VALIDATORS_ROOT)
.unwrap();
assert!(
exported
.minify()
.unwrap()
.equiv(&reexported.minify().unwrap())
);
assert_eq!(is_empty, exported.is_empty());
}
fn check_registration_invariants(
slashing_db: &SlashingDatabase,
registered_validators: &[PublicKeyBytes],
) {
slashing_db
.check_validator_registrations(registered_validators.iter())
.unwrap();
let registered_list = slashing_db
.with_transaction(|txn| slashing_db.list_all_registered_validators(txn))
.unwrap()
.into_iter()
.map(|(_, pubkey)| pubkey)
.collect::<Vec<_>>();
assert_eq!(registered_validators, registered_list);
}