Implement Slashing Protection (#1116)

* Implement slashing protection

Roll-up of #588 with some conflicts resolved

* WIP improvements

* Require slot uniqueness for blocks (rather than epochs)
* Native DB support for Slot and Epoch
* Simplify surrounding/surrounded-by queries

* Implement unified slashing protection database

A single SQL database saves on open file descriptors.

* Make slashing protection concurrency safe.

Revive tests, add parallel tests.

* Some simplifications

* Auto-registration, test clean-ups

* More tests, clean-ups, hardening

* Fix comments in BLS

* Optimise bulk validator registration

* Delete outdated tests

* Use bundled SQLite in slashing protection

* Auto-register validators in simulation

* Use real signing_root in slashing protection

* Update book for --auto-register

* Refine log messages and help flags

* Correct typo in Cargo.toml authors

* Fix merge conflicts

* Safer error handling in sqlite slot/epoch

* Address review comments

* Add attestation test mutating block root

Co-authored-by: pscott <scottpiriou@gmail.com>
This commit is contained in:
Michael Sproul
2020-05-18 16:25:16 +10:00
committed by GitHub
parent 90b3953dda
commit 2d8e2dd7f5
30 changed files with 1720 additions and 52 deletions

View File

@@ -0,0 +1,17 @@
[package]
name = "slashing_protection"
version = "0.1.0"
authors = ["Michael Sproul <michael@sigmaprime.io>", "pscott <scottpiriou@gmail.com>"]
edition = "2018"
[dependencies]
tempfile = "3.1.0"
types = { path = "../../eth2/types" }
tree_hash = { path = "../../eth2/utils/tree_hash" }
rusqlite = { version = "0.22.0", features = ["bundled"] }
r2d2 = "0.8.8"
r2d2_sqlite = "0.15"
parking_lot = "0.9.0"
[dev-dependencies]
rayon = "1.3.0"

View File

@@ -0,0 +1,389 @@
#![cfg(test)]
use crate::test_utils::*;
use crate::*;
use types::{AttestationData, Checkpoint, Epoch, Hash256, Slot};
pub fn build_checkpoint(epoch_num: u64) -> Checkpoint {
Checkpoint {
epoch: Epoch::from(epoch_num),
root: Hash256::zero(),
}
}
pub fn attestation_data_builder(source: u64, target: u64) -> AttestationData {
let source = build_checkpoint(source);
let target = build_checkpoint(target);
let index = 0u64;
let slot = Slot::from(0u64);
AttestationData {
slot,
index,
beacon_block_root: Hash256::zero(),
source,
target,
}
}
/// Create a signed attestation from `attestation`, assuming the default domain.
fn signed_att(attestation: &AttestationData) -> SignedAttestation {
SignedAttestation::from_attestation(attestation, DEFAULT_DOMAIN)
}
#[test]
fn valid_empty_history() {
StreamTest {
cases: vec![Test::single(attestation_data_builder(2, 3))],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_genesis() {
StreamTest {
cases: vec![Test::single(attestation_data_builder(0, 0))],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_out_of_order_attestation() {
StreamTest {
cases: vec![
Test::single(attestation_data_builder(0, 3)),
Test::single(attestation_data_builder(2, 5)),
Test::single(attestation_data_builder(1, 4)),
],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_repeat_attestation() {
StreamTest {
cases: vec![
Test::single(attestation_data_builder(0, 1)),
Test::single(attestation_data_builder(0, 1)).expect_same_data(),
],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_source_from_first_entry() {
StreamTest {
cases: vec![
Test::single(attestation_data_builder(6, 7)),
Test::single(attestation_data_builder(6, 8)),
],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_multiple_validators_double_vote() {
StreamTest {
registered_validators: vec![pubkey(0), pubkey(1)],
cases: vec![
Test::with_pubkey(pubkey(0), attestation_data_builder(0, 1)),
Test::with_pubkey(pubkey(1), attestation_data_builder(0, 1)),
],
}
.run()
}
#[test]
fn valid_vote_chain_repeat_first() {
StreamTest {
cases: vec![
Test::single(attestation_data_builder(0, 1)),
Test::single(attestation_data_builder(1, 2)),
Test::single(attestation_data_builder(2, 3)),
Test::single(attestation_data_builder(0, 1)).expect_same_data(),
],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_vote_chain_repeat_middle() {
StreamTest {
cases: vec![
Test::single(attestation_data_builder(0, 1)),
Test::single(attestation_data_builder(1, 2)),
Test::single(attestation_data_builder(2, 3)),
Test::single(attestation_data_builder(1, 2)).expect_same_data(),
],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_vote_chain_repeat_last() {
StreamTest {
cases: vec![
Test::single(attestation_data_builder(0, 1)),
Test::single(attestation_data_builder(1, 2)),
Test::single(attestation_data_builder(2, 3)),
Test::single(attestation_data_builder(2, 3)).expect_same_data(),
],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_multiple_validators_not_surrounding() {
// Attestations that would be problematic if they came from the same validator, but are OK
// coming from different validators.
StreamTest {
registered_validators: vec![pubkey(0), pubkey(1)],
cases: vec![
Test::with_pubkey(pubkey(0), attestation_data_builder(0, 10)),
Test::with_pubkey(pubkey(0), attestation_data_builder(10, 20)),
Test::with_pubkey(pubkey(1), attestation_data_builder(1, 9)),
Test::with_pubkey(pubkey(1), attestation_data_builder(9, 21)),
],
}
.run()
}
#[test]
fn invalid_source_exceeds_target() {
StreamTest {
cases: vec![Test::single(attestation_data_builder(1, 0))
.expect_invalid_att(InvalidAttestation::SourceExceedsTarget)],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_unregistered_validator() {
StreamTest {
registered_validators: vec![],
cases: vec![
Test::single(attestation_data_builder(2, 3)).expect_result(Err(
NotSafe::UnregisteredValidator(pubkey(DEFAULT_VALIDATOR_INDEX)),
)),
],
}
.run()
}
#[test]
fn invalid_double_vote_diff_source() {
let first = attestation_data_builder(0, 2);
StreamTest {
cases: vec![
Test::single(first.clone()),
Test::single(attestation_data_builder(1, 2))
.expect_invalid_att(InvalidAttestation::DoubleVote(signed_att(&first))),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_double_vote_diff_target() {
let first = attestation_data_builder(0, 2);
let mut second = attestation_data_builder(0, 2);
second.target.root = Hash256::random();
assert_ne!(first, second);
StreamTest {
cases: vec![
Test::single(first.clone()),
Test::single(second)
.expect_invalid_att(InvalidAttestation::DoubleVote(signed_att(&first))),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_double_vote_diff_data() {
let first = attestation_data_builder(0, 2);
let mut second = attestation_data_builder(0, 2);
second.beacon_block_root = Hash256::random();
assert_ne!(first, second);
StreamTest {
cases: vec![
Test::single(first.clone()),
Test::single(second)
.expect_invalid_att(InvalidAttestation::DoubleVote(signed_att(&first))),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_double_vote_diff_domain() {
let first = attestation_data_builder(0, 2);
let domain1 = Hash256::from_low_u64_le(1);
let domain2 = Hash256::from_low_u64_le(2);
StreamTest {
cases: vec![
Test::single(first.clone()).with_domain(domain1),
Test::single(first.clone())
.with_domain(domain2)
.expect_invalid_att(InvalidAttestation::DoubleVote(
SignedAttestation::from_attestation(&first, domain1),
)),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_double_vote_diff_source_multi() {
let first = attestation_data_builder(0, 2);
let second = attestation_data_builder(1, 3);
let third = attestation_data_builder(2, 4);
StreamTest {
cases: vec![
Test::single(first.clone()),
Test::single(second.clone()),
Test::single(third.clone()),
Test::single(attestation_data_builder(1, 2))
.expect_invalid_att(InvalidAttestation::DoubleVote(signed_att(&first))),
Test::single(attestation_data_builder(2, 3))
.expect_invalid_att(InvalidAttestation::DoubleVote(signed_att(&second))),
Test::single(attestation_data_builder(3, 4))
.expect_invalid_att(InvalidAttestation::DoubleVote(signed_att(&third))),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_surrounding_single() {
let first = attestation_data_builder(2, 3);
let second = attestation_data_builder(4, 5);
let third = attestation_data_builder(6, 7);
StreamTest {
cases: vec![
Test::single(first.clone()),
Test::single(second.clone()),
Test::single(third.clone()),
Test::single(attestation_data_builder(1, 4)).expect_invalid_att(
InvalidAttestation::NewSurroundsPrev {
prev: signed_att(&first),
},
),
Test::single(attestation_data_builder(3, 6)).expect_invalid_att(
InvalidAttestation::NewSurroundsPrev {
prev: signed_att(&second),
},
),
Test::single(attestation_data_builder(5, 8)).expect_invalid_att(
InvalidAttestation::NewSurroundsPrev {
prev: signed_att(&third),
},
),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_surrounding_from_first_source() {
let first = attestation_data_builder(2, 3);
let second = attestation_data_builder(3, 4);
StreamTest {
cases: vec![
Test::single(first.clone()),
Test::single(second.clone()),
Test::single(attestation_data_builder(2, 5)).expect_invalid_att(
InvalidAttestation::NewSurroundsPrev {
prev: signed_att(&second),
},
),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_surrounding_multiple_votes() {
let first = attestation_data_builder(0, 1);
let second = attestation_data_builder(1, 2);
let third = attestation_data_builder(2, 3);
StreamTest {
cases: vec![
Test::single(first.clone()),
Test::single(second.clone()),
Test::single(third.clone()),
Test::single(attestation_data_builder(0, 4)).expect_invalid_att(
InvalidAttestation::NewSurroundsPrev {
prev: signed_att(&third),
},
),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_prev_surrounds_new() {
let first = attestation_data_builder(0, 7);
StreamTest {
cases: vec![
Test::single(first.clone()),
Test::single(attestation_data_builder(1, 6)).expect_invalid_att(
InvalidAttestation::PrevSurroundsNew {
prev: signed_att(&first),
},
),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_prev_surrounds_new_multiple() {
let first = attestation_data_builder(0, 4);
let second = attestation_data_builder(1, 7);
let third = attestation_data_builder(8, 10);
StreamTest {
cases: vec![
Test::single(first.clone()),
Test::single(second.clone()),
Test::single(third.clone()),
Test::single(attestation_data_builder(9, 9)).expect_invalid_att(
InvalidAttestation::PrevSurroundsNew {
prev: signed_att(&third),
},
),
Test::single(attestation_data_builder(2, 6)).expect_invalid_att(
InvalidAttestation::PrevSurroundsNew {
prev: signed_att(&second),
},
),
Test::single(attestation_data_builder(1, 2)).expect_invalid_att(
InvalidAttestation::PrevSurroundsNew {
prev: signed_att(&first),
},
),
],
..StreamTest::default()
}
.run()
}

View File

@@ -0,0 +1,124 @@
#![cfg(test)]
use super::*;
use crate::test_utils::*;
use types::{BeaconBlockHeader, Hash256, Slot};
pub fn block(slot: u64) -> BeaconBlockHeader {
BeaconBlockHeader {
slot: Slot::new(slot),
proposer_index: 0,
parent_root: Hash256::random(),
state_root: Hash256::random(),
body_root: Hash256::random(),
}
}
#[test]
fn valid_empty_history() {
StreamTest {
cases: vec![Test::single(block(1))],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_blocks() {
StreamTest {
cases: vec![
Test::single(block(1)),
Test::single(block(2)),
Test::single(block(3)),
Test::single(block(4)),
],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_same_block() {
let block = block(100);
StreamTest {
cases: vec![
Test::single(block.clone()),
Test::single(block).expect_same_data(),
],
..StreamTest::default()
}
.run()
}
#[test]
fn valid_same_slot_different_validator() {
StreamTest {
registered_validators: vec![pubkey(0), pubkey(1)],
cases: vec![
Test::with_pubkey(pubkey(0), block(100)),
Test::with_pubkey(pubkey(1), block(100)),
],
}
.run()
}
#[test]
fn valid_same_block_different_validator() {
let block = block(100);
StreamTest {
registered_validators: vec![pubkey(0), pubkey(1)],
cases: vec![
Test::with_pubkey(pubkey(0), block.clone()),
Test::with_pubkey(pubkey(1), block.clone()),
],
}
.run()
}
#[test]
fn invalid_double_block_proposal() {
let first_block = block(1);
StreamTest {
cases: vec![
Test::single(first_block.clone()),
Test::single(block(1)).expect_invalid_block(InvalidBlock::DoubleBlockProposal(
SignedBlock::from_header(&first_block, DEFAULT_DOMAIN),
)),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_double_block_proposal_diff_domain() {
let first_block = block(1);
let domain1 = Hash256::from_low_u64_be(1);
let domain2 = Hash256::from_low_u64_be(2);
StreamTest {
cases: vec![
Test::single(first_block.clone()).with_domain(domain1),
Test::single(first_block.clone())
.with_domain(domain2)
.expect_invalid_block(InvalidBlock::DoubleBlockProposal(SignedBlock::from_header(
&first_block,
domain1,
))),
],
..StreamTest::default()
}
.run()
}
#[test]
fn invalid_unregistered_validator() {
StreamTest {
registered_validators: vec![],
cases: vec![
Test::single(block(0)).expect_result(Err(NotSafe::UnregisteredValidator(pubkey(
DEFAULT_VALIDATOR_INDEX,
)))),
],
}
.run()
}

View File

@@ -0,0 +1,77 @@
mod attestation_tests;
mod block_tests;
mod parallel_tests;
mod signed_attestation;
mod signed_block;
mod slashing_database;
mod test_utils;
pub use crate::signed_attestation::{InvalidAttestation, SignedAttestation};
pub use crate::signed_block::{InvalidBlock, SignedBlock};
pub use crate::slashing_database::SlashingDatabase;
use rusqlite::Error as SQLError;
use std::io::{Error as IOError, ErrorKind};
use std::string::ToString;
use types::{Hash256, PublicKey};
/// The attestation or block is not safe to sign.
///
/// This could be because it's slashable, or because an error occurred.
#[derive(PartialEq, Debug)]
pub enum NotSafe {
UnregisteredValidator(PublicKey),
InvalidBlock(InvalidBlock),
InvalidAttestation(InvalidAttestation),
IOError(ErrorKind),
SQLError(String),
SQLPoolError(String),
}
/// The attestation or block is safe to sign, and will not cause the signer to be slashed.
#[derive(PartialEq, Debug)]
pub enum Safe {
/// Casting the exact same data (block or attestation) twice is never slashable.
SameData,
/// Incoming data is safe from slashing, and is not a duplicate.
Valid,
}
/// Safely parse a `Hash256` from the given `column` of an SQLite `row`.
fn hash256_from_row(column: usize, row: &rusqlite::Row) -> rusqlite::Result<Hash256> {
use rusqlite::{types::Type, Error};
let bytes: Vec<u8> = row.get(column)?;
if bytes.len() == 32 {
Ok(Hash256::from_slice(&bytes))
} else {
Err(Error::FromSqlConversionFailure(
column,
Type::Blob,
Box::from(format!("Invalid length for Hash256: {}", bytes.len())),
))
}
}
impl From<IOError> for NotSafe {
fn from(error: IOError) -> NotSafe {
NotSafe::IOError(error.kind())
}
}
impl From<SQLError> for NotSafe {
fn from(error: SQLError) -> NotSafe {
NotSafe::SQLError(error.to_string())
}
}
impl From<r2d2::Error> for NotSafe {
fn from(error: r2d2::Error) -> Self {
NotSafe::SQLPoolError(format!("{:?}", error))
}
}
impl ToString for NotSafe {
fn to_string(&self) -> String {
format!("{:?}", self)
}
}

View File

@@ -0,0 +1,82 @@
//! Tests that stress the concurrency safety of the slashing protection DB.
#![cfg(test)]
use crate::attestation_tests::attestation_data_builder;
use crate::block_tests::block;
use crate::test_utils::*;
use crate::*;
use rayon::prelude::*;
use tempfile::tempdir;
#[test]
fn block_same_slot() {
let dir = tempdir().unwrap();
let slashing_db_file = dir.path().join("slashing_protection.sqlite");
let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap();
let pk = pubkey(0);
slashing_db.register_validator(&pk).unwrap();
// A stream of blocks all with the same slot.
let num_blocks = 10;
let results = (0..num_blocks)
.into_par_iter()
.map(|_| slashing_db.check_and_insert_block_proposal(&pk, &block(1), DEFAULT_DOMAIN))
.collect::<Vec<_>>();
let num_successes = results.iter().filter(|res| res.is_ok()).count();
assert_eq!(num_successes, 1);
}
#[test]
fn attestation_same_target() {
let dir = tempdir().unwrap();
let slashing_db_file = dir.path().join("slashing_protection.sqlite");
let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap();
let pk = pubkey(0);
slashing_db.register_validator(&pk).unwrap();
// A stream of attestations all with the same target.
let num_attestations = 10;
let results = (0..num_attestations)
.into_par_iter()
.map(|i| {
slashing_db.check_and_insert_attestation(
&pk,
&attestation_data_builder(i, num_attestations),
DEFAULT_DOMAIN,
)
})
.collect::<Vec<_>>();
let num_successes = results.iter().filter(|res| res.is_ok()).count();
assert_eq!(num_successes, 1);
}
#[test]
fn attestation_surround_fest() {
let dir = tempdir().unwrap();
let slashing_db_file = dir.path().join("slashing_protection.sqlite");
let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap();
let pk = pubkey(0);
slashing_db.register_validator(&pk).unwrap();
// A stream of attestations that all surround each other.
let num_attestations = 10;
let results = (0..num_attestations)
.into_par_iter()
.map(|i| {
let att = attestation_data_builder(i, 2 * num_attestations - i);
slashing_db.check_and_insert_attestation(&pk, &att, DEFAULT_DOMAIN)
})
.collect::<Vec<_>>();
let num_successes = results.iter().filter(|res| res.is_ok()).count();
assert_eq!(num_successes, 1);
}

View File

@@ -0,0 +1,50 @@
use crate::hash256_from_row;
use types::{AttestationData, Epoch, Hash256, SignedRoot};
/// An attestation that has previously been signed.
#[derive(Clone, Debug, PartialEq)]
pub struct SignedAttestation {
pub source_epoch: Epoch,
pub target_epoch: Epoch,
pub signing_root: Hash256,
}
/// Reasons why an attestation may be slashable (or invalid).
#[derive(PartialEq, Debug)]
pub enum InvalidAttestation {
/// The attestation has the same target epoch as an attestation from the DB (enclosed).
DoubleVote(SignedAttestation),
/// The attestation surrounds an existing attestation from the database (`prev`).
NewSurroundsPrev { prev: SignedAttestation },
/// The attestation is surrounded by an existing attestation from the database (`prev`).
PrevSurroundsNew { prev: SignedAttestation },
/// The attestation is invalid because its source epoch is greater than its target epoch.
SourceExceedsTarget,
}
impl SignedAttestation {
pub fn new(source_epoch: Epoch, target_epoch: Epoch, signing_root: Hash256) -> Self {
Self {
source_epoch,
target_epoch,
signing_root,
}
}
/// Create a `SignedAttestation` from attestation data and a domain.
pub fn from_attestation(attestation: &AttestationData, domain: Hash256) -> Self {
Self {
source_epoch: attestation.source.epoch,
target_epoch: attestation.target.epoch,
signing_root: attestation.signing_root(domain),
}
}
/// Create a `SignedAttestation` from an SQLite row of `(source, target, signing_root)`.
pub fn from_row(row: &rusqlite::Row) -> rusqlite::Result<Self> {
let source = row.get(0)?;
let target = row.get(1)?;
let signing_root = hash256_from_row(2, row)?;
Ok(SignedAttestation::new(source, target, signing_root))
}
}

View File

@@ -0,0 +1,35 @@
use crate::hash256_from_row;
use types::{BeaconBlockHeader, Hash256, SignedRoot, Slot};
/// A block that has previously been signed.
#[derive(Clone, Debug, PartialEq)]
pub struct SignedBlock {
pub slot: Slot,
pub signing_root: Hash256,
}
/// Reasons why a block may be slashable.
#[derive(PartialEq, Debug)]
pub enum InvalidBlock {
DoubleBlockProposal(SignedBlock),
}
impl SignedBlock {
pub fn new(slot: Slot, signing_root: Hash256) -> Self {
Self { slot, signing_root }
}
pub fn from_header(header: &BeaconBlockHeader, domain: Hash256) -> Self {
Self {
slot: header.slot,
signing_root: header.signing_root(domain),
}
}
/// Parse an SQLite row of `(slot, signing_root)`.
pub fn from_row(row: &rusqlite::Row) -> rusqlite::Result<Self> {
let slot = row.get(0)?;
let signing_root = hash256_from_row(1, row)?;
Ok(SignedBlock { slot, signing_root })
}
}

View File

@@ -0,0 +1,471 @@
use crate::signed_attestation::InvalidAttestation;
use crate::signed_block::InvalidBlock;
use crate::{NotSafe, Safe, SignedAttestation, SignedBlock};
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{params, OptionalExtension, Transaction, TransactionBehavior};
use std::fs::{File, OpenOptions};
use std::path::Path;
use std::time::Duration;
use types::{AttestationData, BeaconBlockHeader, Hash256, PublicKey, SignedRoot};
type Pool = r2d2::Pool<SqliteConnectionManager>;
/// We set the pool size to 1 for compatibility with locking_mode=EXCLUSIVE.
///
/// This is perhaps overkill in the presence of exclusive transactions, but has
/// the added bonus of preventing other processes from trying to use our slashing database.
pub const POOL_SIZE: u32 = 1;
#[cfg(not(test))]
pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(test)]
pub const CONNECTION_TIMEOUT: Duration = Duration::from_millis(100);
#[derive(Debug, Clone)]
pub struct SlashingDatabase {
conn_pool: Pool,
}
impl SlashingDatabase {
/// Open an existing database at the given `path`, or create one if none exists.
pub fn open_or_create(path: &Path) -> Result<Self, NotSafe> {
if path.exists() {
Self::open(path)
} else {
Self::create(path)
}
}
/// Create a slashing database at the given path.
///
/// Error if a database (or any file) already exists at `path`.
pub fn create(path: &Path) -> Result<Self, NotSafe> {
let file = OpenOptions::new()
.write(true)
.read(true)
.create_new(true)
.open(path)?;
Self::set_db_file_permissions(&file)?;
let conn_pool = Self::open_conn_pool(path)?;
let conn = conn_pool.get()?;
conn.execute(
"CREATE TABLE validators (
id INTEGER PRIMARY KEY,
public_key BLOB NOT NULL
)",
params![],
)?;
conn.execute(
"CREATE TABLE signed_blocks (
validator_id INTEGER NOT NULL,
slot INTEGER NOT NULL,
signing_root BLOB NOT NULL,
FOREIGN KEY(validator_id) REFERENCES validators(id)
UNIQUE (validator_id, slot)
)",
params![],
)?;
conn.execute(
"CREATE TABLE signed_attestations (
validator_id INTEGER,
source_epoch INTEGER NOT NULL,
target_epoch INTEGER NOT NULL,
signing_root BLOB NOT NULL,
FOREIGN KEY(validator_id) REFERENCES validators(id)
UNIQUE (validator_id, target_epoch)
)",
params![],
)?;
Ok(Self { conn_pool })
}
/// Open an existing `SlashingDatabase` from disk.
pub fn open(path: &Path) -> Result<Self, NotSafe> {
let conn_pool = Self::open_conn_pool(&path)?;
Ok(Self { conn_pool })
}
/// Open a new connection pool with all of the necessary settings and tweaks.
fn open_conn_pool(path: &Path) -> Result<Pool, NotSafe> {
let manager = SqliteConnectionManager::file(path)
.with_flags(rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE)
.with_init(Self::apply_pragmas);
let conn_pool = Pool::builder()
.max_size(POOL_SIZE)
.connection_timeout(CONNECTION_TIMEOUT)
.build(manager)
.map_err(|e| NotSafe::SQLError(format!("Unable to open database: {:?}", e)))?;
Ok(conn_pool)
}
/// Apply the necessary settings to an SQLite connection.
///
/// Most importantly, put the database into exclusive locking mode, so that threads are forced
/// to serialise all DB access (to prevent slashable data being checked and signed in parallel).
/// The exclusive locking mode also has the benefit of applying to other processes, so multiple
/// Lighthouse processes trying to access the same database will also be blocked.
fn apply_pragmas(conn: &mut rusqlite::Connection) -> Result<(), rusqlite::Error> {
conn.pragma_update(None, "foreign_keys", &true)?;
conn.pragma_update(None, "locking_mode", &"EXCLUSIVE")?;
Ok(())
}
/// Set the database file to readable and writable only by its owner (0600).
#[cfg(unix)]
fn set_db_file_permissions(file: &File) -> Result<(), NotSafe> {
use std::os::unix::fs::PermissionsExt;
let mut perm = file.metadata()?.permissions();
perm.set_mode(0o600);
file.set_permissions(perm)?;
Ok(())
}
// TODO: add support for Windows ACLs
#[cfg(windows)]
fn set_db_file_permissions(file: &File) -> Result<(), NotSafe> {}
/// Register a validator with the slashing protection database.
///
/// This allows the validator to record their signatures in the database, and check
/// for slashings.
pub fn register_validator(&self, validator_pk: &PublicKey) -> Result<(), NotSafe> {
self.register_validators(std::iter::once(validator_pk))
}
/// Register multiple validators with the slashing protection database.
pub fn register_validators<'a>(
&self,
public_keys: impl Iterator<Item = &'a PublicKey>,
) -> Result<(), NotSafe> {
let mut conn = self.conn_pool.get()?;
let txn = conn.transaction()?;
{
let mut stmt = txn.prepare("INSERT INTO validators (public_key) VALUES (?1)")?;
for pubkey in public_keys {
stmt.execute(&[pubkey.as_hex_string()])?;
}
}
txn.commit()?;
Ok(())
}
/// Get the database-internal ID for a validator.
///
/// This is NOT the same as a validator index, and depends on the ordering that validators
/// are registered with the slashing protection database (and may vary between machines).
fn get_validator_id(txn: &Transaction, public_key: &PublicKey) -> Result<i64, NotSafe> {
txn.query_row(
"SELECT id FROM validators WHERE public_key = ?1",
params![&public_key.as_hex_string()],
|row| row.get(0),
)
.optional()?
.ok_or_else(|| NotSafe::UnregisteredValidator(public_key.clone()))
}
/// Check a block proposal from `validator_pubkey` for slash safety.
fn check_block_proposal(
&self,
txn: &Transaction,
validator_pubkey: &PublicKey,
block_header: &BeaconBlockHeader,
domain: Hash256,
) -> Result<Safe, NotSafe> {
let validator_id = Self::get_validator_id(txn, validator_pubkey)?;
let existing_block = txn
.prepare(
"SELECT slot, signing_root
FROM signed_blocks
WHERE validator_id = ?1 AND slot = ?2",
)?
.query_row(
params![validator_id, block_header.slot],
SignedBlock::from_row,
)
.optional()?;
if let Some(existing_block) = existing_block {
if existing_block.signing_root == block_header.signing_root(domain) {
// Same slot and same hash -> we're re-broadcasting a previously signed block
Ok(Safe::SameData)
} else {
// Same epoch but not the same hash -> it's a DoubleBlockProposal
Err(NotSafe::InvalidBlock(InvalidBlock::DoubleBlockProposal(
existing_block,
)))
}
} else {
Ok(Safe::Valid)
}
}
/// Check an attestation from `validator_pubkey` for slash safety.
fn check_attestation(
&self,
txn: &Transaction,
validator_pubkey: &PublicKey,
attestation: &AttestationData,
domain: Hash256,
) -> Result<Safe, NotSafe> {
let att_source_epoch = attestation.source.epoch;
let att_target_epoch = attestation.target.epoch;
// Although it's not required to avoid slashing, we disallow attestations
// which are obviously invalid by virtue of their source epoch exceeding their target.
if att_source_epoch > att_target_epoch {
return Err(NotSafe::InvalidAttestation(
InvalidAttestation::SourceExceedsTarget,
));
}
let validator_id = Self::get_validator_id(txn, validator_pubkey)?;
// 1. Check for a double vote. Namely, an existing attestation with the same target epoch,
// and a different signing root.
let same_target_att = txn
.prepare(
"SELECT source_epoch, target_epoch, signing_root
FROM signed_attestations
WHERE validator_id = ?1 AND target_epoch = ?2",
)?
.query_row(
params![validator_id, att_target_epoch],
SignedAttestation::from_row,
)
.optional()?;
if let Some(existing_attestation) = same_target_att {
// If the new attestation is identical to the existing attestation, then we already
// know that it is safe, and can return immediately.
if existing_attestation.signing_root == attestation.signing_root(domain) {
return Ok(Safe::SameData);
// Otherwise if the hashes are different, this is a double vote.
} else {
return Err(NotSafe::InvalidAttestation(InvalidAttestation::DoubleVote(
existing_attestation,
)));
}
}
// 2. Check that no previous vote is surrounding `attestation`.
// If there is a surrounding attestation, we only return the most recent one.
let surrounding_attestation = txn
.prepare(
"SELECT source_epoch, target_epoch, signing_root
FROM signed_attestations
WHERE validator_id = ?1 AND source_epoch < ?2 AND target_epoch > ?3
ORDER BY target_epoch DESC
LIMIT 1",
)?
.query_row(
params![validator_id, att_source_epoch, att_target_epoch],
SignedAttestation::from_row,
)
.optional()?;
if let Some(prev) = surrounding_attestation {
return Err(NotSafe::InvalidAttestation(
InvalidAttestation::PrevSurroundsNew { prev },
));
}
// 3. Check that no previous vote is surrounded by `attestation`.
// If there is a surrounded attestation, we only return the most recent one.
let surrounded_attestation = txn
.prepare(
"SELECT source_epoch, target_epoch, signing_root
FROM signed_attestations
WHERE validator_id = ?1 AND source_epoch > ?2 AND target_epoch < ?3
ORDER BY target_epoch DESC
LIMIT 1",
)?
.query_row(
params![validator_id, att_source_epoch, att_target_epoch],
SignedAttestation::from_row,
)
.optional()?;
if let Some(prev) = surrounded_attestation {
return Err(NotSafe::InvalidAttestation(
InvalidAttestation::NewSurroundsPrev { prev },
));
}
// Everything has been checked, return Valid
Ok(Safe::Valid)
}
/// Insert a block proposal into the slashing database.
///
/// This should *only* be called in the same (exclusive) transaction as `check_block_proposal`
/// so that the check isn't invalidated by a concurrent mutation.
fn insert_block_proposal(
&self,
txn: &Transaction,
validator_pubkey: &PublicKey,
block_header: &BeaconBlockHeader,
domain: Hash256,
) -> Result<(), NotSafe> {
let validator_id = Self::get_validator_id(txn, validator_pubkey)?;
txn.execute(
"INSERT INTO signed_blocks (validator_id, slot, signing_root)
VALUES (?1, ?2, ?3)",
params![
validator_id,
block_header.slot,
block_header.signing_root(domain).as_bytes()
],
)?;
Ok(())
}
/// Insert an attestation into the slashing database.
///
/// This should *only* be called in the same (exclusive) transaction as `check_attestation`
/// so that the check isn't invalidated by a concurrent mutation.
fn insert_attestation(
&self,
txn: &Transaction,
validator_pubkey: &PublicKey,
attestation: &AttestationData,
domain: Hash256,
) -> Result<(), NotSafe> {
let validator_id = Self::get_validator_id(txn, validator_pubkey)?;
txn.execute(
"INSERT INTO signed_attestations (validator_id, source_epoch, target_epoch, signing_root)
VALUES (?1, ?2, ?3, ?4)",
params![
validator_id,
attestation.source.epoch,
attestation.target.epoch,
attestation.signing_root(domain).as_bytes()
],
)?;
Ok(())
}
/// Check a block proposal for slash safety, and if it is safe, record it in the database.
///
/// The checking and inserting happen atomically and exclusively. We enforce exclusivity
/// to prevent concurrent checks and inserts from resulting in slashable data being inserted.
///
/// This is the safe, externally-callable interface for checking block proposals.
pub fn check_and_insert_block_proposal(
&self,
validator_pubkey: &PublicKey,
block_header: &BeaconBlockHeader,
domain: Hash256,
) -> Result<Safe, NotSafe> {
let mut conn = self.conn_pool.get()?;
let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?;
let safe = self.check_block_proposal(&txn, validator_pubkey, block_header, domain)?;
if safe != Safe::SameData {
self.insert_block_proposal(&txn, validator_pubkey, block_header, domain)?;
}
txn.commit()?;
Ok(safe)
}
/// Check an attestation for slash safety, and if it is safe, record it in the database.
///
/// The checking and inserting happen atomically and exclusively. We enforce exclusivity
/// to prevent concurrent checks and inserts from resulting in slashable data being inserted.
///
/// This is the safe, externally-callable interface for checking attestations.
pub fn check_and_insert_attestation(
&self,
validator_pubkey: &PublicKey,
attestation: &AttestationData,
domain: Hash256,
) -> Result<Safe, NotSafe> {
let mut conn = self.conn_pool.get()?;
let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?;
let safe = self.check_attestation(&txn, validator_pubkey, attestation, domain)?;
if safe != Safe::SameData {
self.insert_attestation(&txn, validator_pubkey, attestation, domain)?;
}
txn.commit()?;
Ok(safe)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::pubkey;
use tempfile::tempdir;
#[test]
fn open_non_existent_error() {
let dir = tempdir().unwrap();
let file = dir.path().join("db.sqlite");
assert!(SlashingDatabase::open(&file).is_err());
}
// Due to the exclusive locking, trying to use an already open database should error.
#[test]
fn double_open_error() {
let dir = tempdir().unwrap();
let file = dir.path().join("db.sqlite");
let _db1 = SlashingDatabase::create(&file).unwrap();
let db2 = SlashingDatabase::open(&file).unwrap();
db2.register_validator(&pubkey(0)).unwrap_err();
}
// Attempting to create the same database twice should error.
#[test]
fn double_create_error() {
let dir = tempdir().unwrap();
let file = dir.path().join("db.sqlite");
let _db1 = SlashingDatabase::create(&file).unwrap();
drop(_db1);
SlashingDatabase::create(&file).unwrap_err();
}
// Check that both `open` and `create` apply the same connection settings.
#[test]
fn connection_settings_applied() {
let dir = tempdir().unwrap();
let file = dir.path().join("db.sqlite");
let check = |db: &SlashingDatabase| {
assert_eq!(db.conn_pool.max_size(), POOL_SIZE);
assert_eq!(db.conn_pool.connection_timeout(), CONNECTION_TIMEOUT);
let conn = db.conn_pool.get().unwrap();
assert_eq!(
conn.pragma_query_value(None, "foreign_keys", |row| { row.get::<_, bool>(0) })
.unwrap(),
true
);
assert_eq!(
conn.pragma_query_value(None, "locking_mode", |row| { row.get::<_, String>(0) })
.unwrap()
.to_uppercase(),
"EXCLUSIVE"
);
};
let db1 = SlashingDatabase::create(&file).unwrap();
check(&db1);
drop(db1);
let db2 = SlashingDatabase::open(&file).unwrap();
check(&db2);
}
}

View File

@@ -0,0 +1,116 @@
#![cfg(test)]
use crate::*;
use tempfile::tempdir;
use types::{
test_utils::generate_deterministic_keypair, AttestationData, BeaconBlockHeader, Hash256,
};
pub const DEFAULT_VALIDATOR_INDEX: usize = 0;
pub const DEFAULT_DOMAIN: Hash256 = Hash256::zero();
pub fn pubkey(index: usize) -> PublicKey {
generate_deterministic_keypair(index).pk
}
pub struct Test<T> {
pubkey: PublicKey,
data: T,
domain: Hash256,
expected: Result<Safe, NotSafe>,
}
impl<T> Test<T> {
pub fn single(data: T) -> Self {
Self::with_pubkey(pubkey(DEFAULT_VALIDATOR_INDEX), data)
}
pub fn with_pubkey(pubkey: PublicKey, data: T) -> Self {
Self {
pubkey,
data,
domain: DEFAULT_DOMAIN,
expected: Ok(Safe::Valid),
}
}
pub fn with_domain(mut self, domain: Hash256) -> Self {
self.domain = domain;
self
}
pub fn expect_result(mut self, result: Result<Safe, NotSafe>) -> Self {
self.expected = result;
self
}
pub fn expect_invalid_att(self, error: InvalidAttestation) -> Self {
self.expect_result(Err(NotSafe::InvalidAttestation(error)))
}
pub fn expect_invalid_block(self, error: InvalidBlock) -> Self {
self.expect_result(Err(NotSafe::InvalidBlock(error)))
}
pub fn expect_same_data(self) -> Self {
self.expect_result(Ok(Safe::SameData))
}
}
pub struct StreamTest<T> {
/// Validators to register.
pub registered_validators: Vec<PublicKey>,
/// Vector of cases and the value expected when calling `check_and_insert_X`.
pub cases: Vec<Test<T>>,
}
impl<T> Default for StreamTest<T> {
fn default() -> Self {
Self {
registered_validators: vec![pubkey(DEFAULT_VALIDATOR_INDEX)],
cases: vec![],
}
}
}
impl StreamTest<AttestationData> {
pub fn run(&self) {
let dir = tempdir().unwrap();
let slashing_db_file = dir.path().join("slashing_protection.sqlite");
let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap();
for pubkey in &self.registered_validators {
slashing_db.register_validator(pubkey).unwrap();
}
for (i, test) in self.cases.iter().enumerate() {
assert_eq!(
slashing_db.check_and_insert_attestation(&test.pubkey, &test.data, test.domain),
test.expected,
"attestation {} not processed as expected",
i
);
}
}
}
impl StreamTest<BeaconBlockHeader> {
pub fn run(&self) {
let dir = tempdir().unwrap();
let slashing_db_file = dir.path().join("slashing_protection.sqlite");
let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap();
for pubkey in &self.registered_validators {
slashing_db.register_validator(pubkey).unwrap();
}
for (i, test) in self.cases.iter().enumerate() {
assert_eq!(
slashing_db.check_and_insert_block_proposal(&test.pubkey, &test.data, test.domain),
test.expected,
"attestation {} not processed as expected",
i
);
}
}
}