mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-15 19:02:42 +00:00
Implement slashing protection interchange format (#1544)
## Issue Addressed Implements support for importing and exporting the slashing protection DB interchange format described here: https://hackmd.io/@sproul/Bk0Y0qdGD Also closes #1584 ## Proposed Changes * [x] Support for serializing and deserializing the format * [x] Support for importing and exporting Lighthouse's database * [x] CLI commands to invoke import and export * [x] Export to minimal format (required when a minimal format has been previously imported) * [x] Tests for export to minimal (utilising mixed importing and attestation signing?) * [x] Tests for import/export of complete format, and import of minimal format * [x] ~~Prevent attestations with sources less than our max source (Danny's suggestion). Required for the fake attestation that we put in for the minimal format to block attestations from source 0.~~ * [x] Add the concept of a "low watermark" for compatibility with the minimal format Bonus! * [x] A fix to a potentially nasty bug involving validators getting re-registered each time the validator client ran! Thankfully, the ordering of keys meant that the validator IDs used for attestations and blocks remained stable -- otherwise we could have had some slashings on our hands! 😱 * [x] Tests to confirm that this bug is indeed vanquished
This commit is contained in:
@@ -1,12 +1,16 @@
|
||||
use crate::interchange::{
|
||||
CompleteInterchangeData, Interchange, InterchangeFormat, InterchangeMetadata,
|
||||
SignedAttestation as InterchangeAttestation, SignedBlock as InterchangeBlock,
|
||||
};
|
||||
use crate::signed_attestation::InvalidAttestation;
|
||||
use crate::signed_block::InvalidBlock;
|
||||
use crate::{NotSafe, Safe, SignedAttestation, SignedBlock};
|
||||
use crate::{hash256_from_row, NotSafe, Safe, SignedAttestation, SignedBlock};
|
||||
use r2d2_sqlite::SqliteConnectionManager;
|
||||
use rusqlite::{params, OptionalExtension, Transaction, TransactionBehavior};
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use types::{AttestationData, BeaconBlockHeader, Hash256, PublicKey, SignedRoot};
|
||||
use types::{AttestationData, BeaconBlockHeader, Epoch, Hash256, PublicKey, SignedRoot, Slot};
|
||||
|
||||
type Pool = r2d2::Pool<SqliteConnectionManager>;
|
||||
|
||||
@@ -20,6 +24,9 @@ pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
#[cfg(test)]
|
||||
pub const CONNECTION_TIMEOUT: Duration = Duration::from_millis(100);
|
||||
|
||||
/// Supported version of the interchange format.
|
||||
pub const SUPPORTED_INTERCHANGE_FORMAT_VERSION: u64 = 4;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SlashingDatabase {
|
||||
conn_pool: Pool,
|
||||
@@ -52,7 +59,7 @@ impl SlashingDatabase {
|
||||
conn.execute(
|
||||
"CREATE TABLE validators (
|
||||
id INTEGER PRIMARY KEY,
|
||||
public_key BLOB NOT NULL
|
||||
public_key BLOB NOT NULL UNIQUE
|
||||
)",
|
||||
params![],
|
||||
)?;
|
||||
@@ -144,15 +151,25 @@ impl SlashingDatabase {
|
||||
) -> Result<(), NotSafe> {
|
||||
let mut conn = self.conn_pool.get()?;
|
||||
let txn = conn.transaction()?;
|
||||
{
|
||||
let mut stmt = txn.prepare("INSERT INTO validators (public_key) VALUES (?1)")?;
|
||||
self.register_validators_in_txn(public_keys, &txn)?;
|
||||
txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
for pubkey in public_keys {
|
||||
/// Register multiple validators inside the given transaction.
|
||||
///
|
||||
/// The caller must commit the transaction for the changes to be persisted.
|
||||
pub fn register_validators_in_txn<'a>(
|
||||
&self,
|
||||
public_keys: impl Iterator<Item = &'a PublicKey>,
|
||||
txn: &Transaction,
|
||||
) -> Result<(), NotSafe> {
|
||||
let mut stmt = txn.prepare("INSERT INTO validators (public_key) VALUES (?1)")?;
|
||||
for pubkey in public_keys {
|
||||
if self.get_validator_id_opt(&txn, pubkey)?.is_none() {
|
||||
stmt.execute(&[pubkey.to_hex_string()])?;
|
||||
}
|
||||
}
|
||||
txn.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -160,14 +177,34 @@ impl SlashingDatabase {
|
||||
///
|
||||
/// This is NOT the same as a validator index, and depends on the ordering that validators
|
||||
/// are registered with the slashing protection database (and may vary between machines).
|
||||
fn get_validator_id(txn: &Transaction, public_key: &PublicKey) -> Result<i64, NotSafe> {
|
||||
txn.query_row(
|
||||
"SELECT id FROM validators WHERE public_key = ?1",
|
||||
params![&public_key.to_hex_string()],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.optional()?
|
||||
.ok_or_else(|| NotSafe::UnregisteredValidator(public_key.clone()))
|
||||
pub fn get_validator_id(&self, public_key: &PublicKey) -> Result<i64, NotSafe> {
|
||||
let mut conn = self.conn_pool.get()?;
|
||||
let txn = conn.transaction()?;
|
||||
self.get_validator_id_in_txn(&txn, public_key)
|
||||
}
|
||||
|
||||
fn get_validator_id_in_txn(
|
||||
&self,
|
||||
txn: &Transaction,
|
||||
public_key: &PublicKey,
|
||||
) -> Result<i64, NotSafe> {
|
||||
self.get_validator_id_opt(txn, public_key)?
|
||||
.ok_or_else(|| NotSafe::UnregisteredValidator(public_key.clone()))
|
||||
}
|
||||
|
||||
/// Optional version of `get_validator_id`.
|
||||
fn get_validator_id_opt(
|
||||
&self,
|
||||
txn: &Transaction,
|
||||
public_key: &PublicKey,
|
||||
) -> Result<Option<i64>, NotSafe> {
|
||||
Ok(txn
|
||||
.query_row(
|
||||
"SELECT id FROM validators WHERE public_key = ?1",
|
||||
params![&public_key.to_hex_string()],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.optional()?)
|
||||
}
|
||||
|
||||
/// Check a block proposal from `validator_pubkey` for slash safety.
|
||||
@@ -175,10 +212,10 @@ impl SlashingDatabase {
|
||||
&self,
|
||||
txn: &Transaction,
|
||||
validator_pubkey: &PublicKey,
|
||||
block_header: &BeaconBlockHeader,
|
||||
domain: Hash256,
|
||||
slot: Slot,
|
||||
signing_root: Hash256,
|
||||
) -> Result<Safe, NotSafe> {
|
||||
let validator_id = Self::get_validator_id(txn, validator_pubkey)?;
|
||||
let validator_id = self.get_validator_id_in_txn(txn, validator_pubkey)?;
|
||||
|
||||
let existing_block = txn
|
||||
.prepare(
|
||||
@@ -186,25 +223,37 @@ impl SlashingDatabase {
|
||||
FROM signed_blocks
|
||||
WHERE validator_id = ?1 AND slot = ?2",
|
||||
)?
|
||||
.query_row(
|
||||
params![validator_id, block_header.slot],
|
||||
SignedBlock::from_row,
|
||||
)
|
||||
.query_row(params![validator_id, slot], SignedBlock::from_row)
|
||||
.optional()?;
|
||||
|
||||
if let Some(existing_block) = existing_block {
|
||||
if existing_block.signing_root == block_header.signing_root(domain) {
|
||||
if existing_block.signing_root == signing_root {
|
||||
// Same slot and same hash -> we're re-broadcasting a previously signed block
|
||||
Ok(Safe::SameData)
|
||||
return Ok(Safe::SameData);
|
||||
} else {
|
||||
// Same epoch but not the same hash -> it's a DoubleBlockProposal
|
||||
Err(NotSafe::InvalidBlock(InvalidBlock::DoubleBlockProposal(
|
||||
return Err(NotSafe::InvalidBlock(InvalidBlock::DoubleBlockProposal(
|
||||
existing_block,
|
||||
)))
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
Ok(Safe::Valid)
|
||||
}
|
||||
|
||||
let min_slot = txn
|
||||
.prepare("SELECT MIN(slot) FROM signed_blocks WHERE validator_id = ?1")?
|
||||
.query_row(params![validator_id], |row| row.get(0))?;
|
||||
|
||||
if let Some(min_slot) = min_slot {
|
||||
if slot <= min_slot {
|
||||
return Err(NotSafe::InvalidBlock(
|
||||
InvalidBlock::SlotViolatesLowerBound {
|
||||
block_slot: slot,
|
||||
bound_slot: min_slot,
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Safe::Valid)
|
||||
}
|
||||
|
||||
/// Check an attestation from `validator_pubkey` for slash safety.
|
||||
@@ -212,12 +261,10 @@ impl SlashingDatabase {
|
||||
&self,
|
||||
txn: &Transaction,
|
||||
validator_pubkey: &PublicKey,
|
||||
attestation: &AttestationData,
|
||||
domain: Hash256,
|
||||
att_source_epoch: Epoch,
|
||||
att_target_epoch: Epoch,
|
||||
att_signing_root: Hash256,
|
||||
) -> Result<Safe, NotSafe> {
|
||||
let att_source_epoch = attestation.source.epoch;
|
||||
let att_target_epoch = attestation.target.epoch;
|
||||
|
||||
// Although it's not required to avoid slashing, we disallow attestations
|
||||
// which are obviously invalid by virtue of their source epoch exceeding their target.
|
||||
if att_source_epoch > att_target_epoch {
|
||||
@@ -226,10 +273,10 @@ impl SlashingDatabase {
|
||||
));
|
||||
}
|
||||
|
||||
let validator_id = Self::get_validator_id(txn, validator_pubkey)?;
|
||||
let validator_id = self.get_validator_id_in_txn(txn, validator_pubkey)?;
|
||||
|
||||
// 1. Check for a double vote. Namely, an existing attestation with the same target epoch,
|
||||
// and a different signing root.
|
||||
// Check for a double vote. Namely, an existing attestation with the same target epoch,
|
||||
// and a different signing root.
|
||||
let same_target_att = txn
|
||||
.prepare(
|
||||
"SELECT source_epoch, target_epoch, signing_root
|
||||
@@ -245,7 +292,7 @@ impl SlashingDatabase {
|
||||
if let Some(existing_attestation) = same_target_att {
|
||||
// If the new attestation is identical to the existing attestation, then we already
|
||||
// know that it is safe, and can return immediately.
|
||||
if existing_attestation.signing_root == attestation.signing_root(domain) {
|
||||
if existing_attestation.signing_root == att_signing_root {
|
||||
return Ok(Safe::SameData);
|
||||
// Otherwise if the hashes are different, this is a double vote.
|
||||
} else {
|
||||
@@ -255,7 +302,7 @@ impl SlashingDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Check that no previous vote is surrounding `attestation`.
|
||||
// Check that no previous vote is surrounding `attestation`.
|
||||
// If there is a surrounding attestation, we only return the most recent one.
|
||||
let surrounding_attestation = txn
|
||||
.prepare(
|
||||
@@ -277,7 +324,7 @@ impl SlashingDatabase {
|
||||
));
|
||||
}
|
||||
|
||||
// 3. Check that no previous vote is surrounded by `attestation`.
|
||||
// Check that no previous vote is surrounded by `attestation`.
|
||||
// If there is a surrounded attestation, we only return the most recent one.
|
||||
let surrounded_attestation = txn
|
||||
.prepare(
|
||||
@@ -299,6 +346,39 @@ impl SlashingDatabase {
|
||||
));
|
||||
}
|
||||
|
||||
// Check lower bounds: ensure that source is greater than or equal to min source,
|
||||
// and target is greater than min target. This allows pruning, and compatibility
|
||||
// with the interchange format.
|
||||
let min_source = txn
|
||||
.prepare("SELECT MIN(source_epoch) FROM signed_attestations WHERE validator_id = ?1")?
|
||||
.query_row(params![validator_id], |row| row.get(0))?;
|
||||
|
||||
if let Some(min_source) = min_source {
|
||||
if att_source_epoch < min_source {
|
||||
return Err(NotSafe::InvalidAttestation(
|
||||
InvalidAttestation::SourceLessThanLowerBound {
|
||||
source_epoch: att_source_epoch,
|
||||
bound_epoch: min_source,
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let min_target = txn
|
||||
.prepare("SELECT MIN(target_epoch) FROM signed_attestations WHERE validator_id = ?1")?
|
||||
.query_row(params![validator_id], |row| row.get(0))?;
|
||||
|
||||
if let Some(min_target) = min_target {
|
||||
if att_target_epoch <= min_target {
|
||||
return Err(NotSafe::InvalidAttestation(
|
||||
InvalidAttestation::TargetLessThanOrEqLowerBound {
|
||||
target_epoch: att_target_epoch,
|
||||
bound_epoch: min_target,
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Everything has been checked, return Valid
|
||||
Ok(Safe::Valid)
|
||||
}
|
||||
@@ -311,19 +391,15 @@ impl SlashingDatabase {
|
||||
&self,
|
||||
txn: &Transaction,
|
||||
validator_pubkey: &PublicKey,
|
||||
block_header: &BeaconBlockHeader,
|
||||
domain: Hash256,
|
||||
slot: Slot,
|
||||
signing_root: Hash256,
|
||||
) -> Result<(), NotSafe> {
|
||||
let validator_id = Self::get_validator_id(txn, validator_pubkey)?;
|
||||
let validator_id = self.get_validator_id_in_txn(txn, validator_pubkey)?;
|
||||
|
||||
txn.execute(
|
||||
"INSERT INTO signed_blocks (validator_id, slot, signing_root)
|
||||
VALUES (?1, ?2, ?3)",
|
||||
params![
|
||||
validator_id,
|
||||
block_header.slot,
|
||||
block_header.signing_root(domain).as_bytes()
|
||||
],
|
||||
params![validator_id, slot, signing_root.as_bytes()],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -336,19 +412,20 @@ impl SlashingDatabase {
|
||||
&self,
|
||||
txn: &Transaction,
|
||||
validator_pubkey: &PublicKey,
|
||||
attestation: &AttestationData,
|
||||
domain: Hash256,
|
||||
att_source_epoch: Epoch,
|
||||
att_target_epoch: Epoch,
|
||||
att_signing_root: Hash256,
|
||||
) -> Result<(), NotSafe> {
|
||||
let validator_id = Self::get_validator_id(txn, validator_pubkey)?;
|
||||
let validator_id = self.get_validator_id_in_txn(txn, validator_pubkey)?;
|
||||
|
||||
txn.execute(
|
||||
"INSERT INTO signed_attestations (validator_id, source_epoch, target_epoch, signing_root)
|
||||
VALUES (?1, ?2, ?3, ?4)",
|
||||
params![
|
||||
validator_id,
|
||||
attestation.source.epoch,
|
||||
attestation.target.epoch,
|
||||
attestation.signing_root(domain).as_bytes()
|
||||
att_source_epoch,
|
||||
att_target_epoch,
|
||||
att_signing_root.as_bytes()
|
||||
],
|
||||
)?;
|
||||
Ok(())
|
||||
@@ -365,17 +442,46 @@ impl SlashingDatabase {
|
||||
validator_pubkey: &PublicKey,
|
||||
block_header: &BeaconBlockHeader,
|
||||
domain: Hash256,
|
||||
) -> Result<Safe, NotSafe> {
|
||||
self.check_and_insert_block_signing_root(
|
||||
validator_pubkey,
|
||||
block_header.slot,
|
||||
block_header.signing_root(domain),
|
||||
)
|
||||
}
|
||||
|
||||
/// As for `check_and_insert_block_proposal` but without requiring the whole `BeaconBlockHeader`.
|
||||
pub fn check_and_insert_block_signing_root(
|
||||
&self,
|
||||
validator_pubkey: &PublicKey,
|
||||
slot: Slot,
|
||||
signing_root: Hash256,
|
||||
) -> Result<Safe, NotSafe> {
|
||||
let mut conn = self.conn_pool.get()?;
|
||||
let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?;
|
||||
let safe = self.check_and_insert_block_signing_root_txn(
|
||||
validator_pubkey,
|
||||
slot,
|
||||
signing_root,
|
||||
&txn,
|
||||
)?;
|
||||
txn.commit()?;
|
||||
Ok(safe)
|
||||
}
|
||||
|
||||
let safe = self.check_block_proposal(&txn, validator_pubkey, block_header, domain)?;
|
||||
/// Transactional variant of `check_and_insert_block_signing_root`.
|
||||
pub fn check_and_insert_block_signing_root_txn(
|
||||
&self,
|
||||
validator_pubkey: &PublicKey,
|
||||
slot: Slot,
|
||||
signing_root: Hash256,
|
||||
txn: &Transaction,
|
||||
) -> Result<Safe, NotSafe> {
|
||||
let safe = self.check_block_proposal(&txn, validator_pubkey, slot, signing_root)?;
|
||||
|
||||
if safe != Safe::SameData {
|
||||
self.insert_block_proposal(&txn, validator_pubkey, block_header, domain)?;
|
||||
self.insert_block_proposal(&txn, validator_pubkey, slot, signing_root)?;
|
||||
}
|
||||
|
||||
txn.commit()?;
|
||||
Ok(safe)
|
||||
}
|
||||
|
||||
@@ -390,19 +496,238 @@ impl SlashingDatabase {
|
||||
validator_pubkey: &PublicKey,
|
||||
attestation: &AttestationData,
|
||||
domain: Hash256,
|
||||
) -> Result<Safe, NotSafe> {
|
||||
let attestation_signing_root = attestation.signing_root(domain);
|
||||
self.check_and_insert_attestation_signing_root(
|
||||
validator_pubkey,
|
||||
attestation.source.epoch,
|
||||
attestation.target.epoch,
|
||||
attestation_signing_root,
|
||||
)
|
||||
}
|
||||
|
||||
/// As for `check_and_insert_attestation` but without requiring the whole `AttestationData`.
|
||||
pub fn check_and_insert_attestation_signing_root(
|
||||
&self,
|
||||
validator_pubkey: &PublicKey,
|
||||
att_source_epoch: Epoch,
|
||||
att_target_epoch: Epoch,
|
||||
att_signing_root: Hash256,
|
||||
) -> Result<Safe, NotSafe> {
|
||||
let mut conn = self.conn_pool.get()?;
|
||||
let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?;
|
||||
|
||||
let safe = self.check_attestation(&txn, validator_pubkey, attestation, domain)?;
|
||||
|
||||
if safe != Safe::SameData {
|
||||
self.insert_attestation(&txn, validator_pubkey, attestation, domain)?;
|
||||
}
|
||||
|
||||
let safe = self.check_and_insert_attestation_signing_root_txn(
|
||||
validator_pubkey,
|
||||
att_source_epoch,
|
||||
att_target_epoch,
|
||||
att_signing_root,
|
||||
&txn,
|
||||
)?;
|
||||
txn.commit()?;
|
||||
Ok(safe)
|
||||
}
|
||||
|
||||
/// Transactional variant of `check_and_insert_attestation_signing_root`.
|
||||
fn check_and_insert_attestation_signing_root_txn(
|
||||
&self,
|
||||
validator_pubkey: &PublicKey,
|
||||
att_source_epoch: Epoch,
|
||||
att_target_epoch: Epoch,
|
||||
att_signing_root: Hash256,
|
||||
txn: &Transaction,
|
||||
) -> Result<Safe, NotSafe> {
|
||||
let safe = self.check_attestation(
|
||||
&txn,
|
||||
validator_pubkey,
|
||||
att_source_epoch,
|
||||
att_target_epoch,
|
||||
att_signing_root,
|
||||
)?;
|
||||
|
||||
if safe != Safe::SameData {
|
||||
self.insert_attestation(
|
||||
&txn,
|
||||
validator_pubkey,
|
||||
att_source_epoch,
|
||||
att_target_epoch,
|
||||
att_signing_root,
|
||||
)?;
|
||||
}
|
||||
Ok(safe)
|
||||
}
|
||||
|
||||
/// Import slashing protection from another client in the interchange format.
|
||||
pub fn import_interchange_info(
|
||||
&self,
|
||||
interchange: &Interchange,
|
||||
genesis_validators_root: Hash256,
|
||||
) -> Result<(), InterchangeError> {
|
||||
let version = interchange.metadata.interchange_format_version;
|
||||
if version != SUPPORTED_INTERCHANGE_FORMAT_VERSION {
|
||||
return Err(InterchangeError::UnsupportedVersion(version));
|
||||
}
|
||||
|
||||
if genesis_validators_root != interchange.metadata.genesis_validators_root {
|
||||
return Err(InterchangeError::GenesisValidatorsMismatch {
|
||||
client: genesis_validators_root,
|
||||
interchange_file: interchange.metadata.genesis_validators_root,
|
||||
});
|
||||
}
|
||||
|
||||
// Import atomically, to prevent registering validators with partial information.
|
||||
let mut conn = self.conn_pool.get()?;
|
||||
let txn = conn.transaction()?;
|
||||
|
||||
for record in &interchange.data {
|
||||
self.register_validators_in_txn(std::iter::once(&record.pubkey), &txn)?;
|
||||
|
||||
// Insert all signed blocks.
|
||||
for block in &record.signed_blocks {
|
||||
self.check_and_insert_block_signing_root_txn(
|
||||
&record.pubkey,
|
||||
block.slot,
|
||||
block.signing_root.unwrap_or_else(Hash256::zero),
|
||||
&txn,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Insert all signed attestations.
|
||||
for attestation in &record.signed_attestations {
|
||||
self.check_and_insert_attestation_signing_root_txn(
|
||||
&record.pubkey,
|
||||
attestation.source_epoch,
|
||||
attestation.target_epoch,
|
||||
attestation.signing_root.unwrap_or_else(Hash256::zero),
|
||||
&txn,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
txn.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn export_interchange_info(
|
||||
&self,
|
||||
genesis_validators_root: Hash256,
|
||||
) -> Result<Interchange, InterchangeError> {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
let mut conn = self.conn_pool.get()?;
|
||||
let txn = conn.transaction()?;
|
||||
|
||||
// Map from internal validator pubkey to blocks and attestation for that pubkey.
|
||||
let mut data: BTreeMap<String, (Vec<InterchangeBlock>, Vec<InterchangeAttestation>)> =
|
||||
BTreeMap::new();
|
||||
|
||||
txn.prepare(
|
||||
"SELECT public_key, slot, signing_root
|
||||
FROM signed_blocks, validators
|
||||
WHERE signed_blocks.validator_id = validators.id",
|
||||
)?
|
||||
.query_and_then(params![], |row| {
|
||||
let validator_pubkey: String = row.get(0)?;
|
||||
let slot = row.get(1)?;
|
||||
let signing_root = Some(hash256_from_row(2, &row)?);
|
||||
let signed_block = InterchangeBlock { slot, signing_root };
|
||||
data.entry(validator_pubkey)
|
||||
.or_insert_with(|| (vec![], vec![]))
|
||||
.0
|
||||
.push(signed_block);
|
||||
Ok(())
|
||||
})?
|
||||
.collect::<Result<_, InterchangeError>>()?;
|
||||
|
||||
txn.prepare(
|
||||
"SELECT public_key, source_epoch, target_epoch, signing_root
|
||||
FROM signed_attestations, validators
|
||||
WHERE signed_attestations.validator_id = validators.id",
|
||||
)?
|
||||
.query_and_then(params![], |row| {
|
||||
let validator_pubkey: String = row.get(0)?;
|
||||
let source_epoch = row.get(1)?;
|
||||
let target_epoch = row.get(2)?;
|
||||
let signing_root = Some(hash256_from_row(3, &row)?);
|
||||
let signed_attestation = InterchangeAttestation {
|
||||
source_epoch,
|
||||
target_epoch,
|
||||
signing_root,
|
||||
};
|
||||
data.entry(validator_pubkey)
|
||||
.or_insert_with(|| (vec![], vec![]))
|
||||
.1
|
||||
.push(signed_attestation);
|
||||
Ok(())
|
||||
})?
|
||||
.collect::<Result<_, InterchangeError>>()?;
|
||||
|
||||
let metadata = InterchangeMetadata {
|
||||
interchange_format: InterchangeFormat::Complete,
|
||||
interchange_format_version: SUPPORTED_INTERCHANGE_FORMAT_VERSION,
|
||||
genesis_validators_root,
|
||||
};
|
||||
|
||||
let data = data
|
||||
.into_iter()
|
||||
.map(|(pubkey, (signed_blocks, signed_attestations))| {
|
||||
Ok(CompleteInterchangeData {
|
||||
pubkey: pubkey.parse().map_err(InterchangeError::InvalidPubkey)?,
|
||||
signed_blocks,
|
||||
signed_attestations,
|
||||
})
|
||||
})
|
||||
.collect::<Result<_, InterchangeError>>()?;
|
||||
|
||||
Ok(Interchange { metadata, data })
|
||||
}
|
||||
|
||||
pub fn num_validator_rows(&self) -> Result<u32, NotSafe> {
|
||||
let mut conn = self.conn_pool.get()?;
|
||||
let txn = conn.transaction()?;
|
||||
let count = txn
|
||||
.prepare("SELECT COALESCE(COUNT(*), 0) FROM validators")?
|
||||
.query_row(params![], |row| row.get(0))?;
|
||||
Ok(count)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum InterchangeError {
|
||||
UnsupportedVersion(u64),
|
||||
GenesisValidatorsMismatch {
|
||||
interchange_file: Hash256,
|
||||
client: Hash256,
|
||||
},
|
||||
MinimalAttestationSourceAndTargetInconsistent,
|
||||
SQLError(String),
|
||||
SQLPoolError(r2d2::Error),
|
||||
SerdeJsonError(serde_json::Error),
|
||||
InvalidPubkey(String),
|
||||
NotSafe(NotSafe),
|
||||
}
|
||||
|
||||
impl From<NotSafe> for InterchangeError {
|
||||
fn from(error: NotSafe) -> Self {
|
||||
InterchangeError::NotSafe(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<rusqlite::Error> for InterchangeError {
|
||||
fn from(error: rusqlite::Error) -> Self {
|
||||
Self::SQLError(error.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<r2d2::Error> for InterchangeError {
|
||||
fn from(error: r2d2::Error) -> Self {
|
||||
InterchangeError::SQLPoolError(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for InterchangeError {
|
||||
fn from(error: serde_json::Error) -> Self {
|
||||
InterchangeError::SerdeJsonError(error)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user