mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 12:47:05 +00:00
Fix a vulnerability in the slasher whereby it would OOM upon processing an invalid attestation with an artificially high `validator_index`. This fix has already been made available to affected users on the `slasher-fix` branch. - Prevent attestations from being passed to the slasher prior to signature verification. This was unnecessary, as they would later be passed on successful validation as well. - Add a defensive cap on the maximum validator index processable by the slasher. The cap is high enough that it shouldn't be reached for several years, and will quickly result in warning logs if forgotten. - Add a regression test that confirms that the issue is fixed. Co-Authored-By: Michael Sproul <michael@sigmaprime.io>
346 lines
12 KiB
Rust
346 lines
12 KiB
Rust
use crate::batch_stats::{AttestationStats, BatchStats, BlockStats};
|
|
use crate::metrics::{
|
|
self, SLASHER_NUM_ATTESTATIONS_DEFERRED, SLASHER_NUM_ATTESTATIONS_DROPPED,
|
|
SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH, SLASHER_NUM_ATTESTATIONS_VALID,
|
|
SLASHER_NUM_BLOCKS_PROCESSED,
|
|
};
|
|
use crate::{
|
|
AttestationBatch, AttestationQueue, AttesterRecord, BlockQueue, Config, Error,
|
|
IndexedAttestationId, ProposerSlashingStatus, RwTransaction, SimpleBatch, SlasherDB, array,
|
|
};
|
|
use parking_lot::Mutex;
|
|
use std::collections::HashSet;
|
|
use std::sync::Arc;
|
|
use tracing::{debug, error, info};
|
|
use types::{
|
|
AttesterSlashing, ChainSpec, Epoch, EthSpec, IndexedAttestation, ProposerSlashing,
|
|
SignedBeaconBlockHeader,
|
|
};
|
|
|
|
#[derive(Debug)]
|
|
pub struct Slasher<E: EthSpec> {
|
|
db: SlasherDB<E>,
|
|
attestation_queue: AttestationQueue<E>,
|
|
block_queue: BlockQueue,
|
|
attester_slashings: Mutex<HashSet<AttesterSlashing<E>>>,
|
|
proposer_slashings: Mutex<HashSet<ProposerSlashing>>,
|
|
config: Arc<Config>,
|
|
}
|
|
|
|
impl<E: EthSpec> Slasher<E> {
|
|
pub fn open(config: Config, spec: Arc<ChainSpec>) -> Result<Self, Error> {
|
|
config.validate()?;
|
|
let config = Arc::new(config);
|
|
let db = SlasherDB::open(config.clone(), spec)?;
|
|
Self::from_config_and_db(config, db)
|
|
}
|
|
|
|
/// TESTING ONLY.
|
|
///
|
|
/// Initialise a slasher database from an existing `db`. The caller must ensure that the
|
|
/// database's config matches the one provided.
|
|
pub fn from_config_and_db(config: Arc<Config>, db: SlasherDB<E>) -> Result<Self, Error> {
|
|
config.validate()?;
|
|
let attester_slashings = Mutex::new(HashSet::new());
|
|
let proposer_slashings = Mutex::new(HashSet::new());
|
|
let attestation_queue = AttestationQueue::default();
|
|
let block_queue = BlockQueue::default();
|
|
Ok(Self {
|
|
db,
|
|
attestation_queue,
|
|
block_queue,
|
|
attester_slashings,
|
|
proposer_slashings,
|
|
config,
|
|
})
|
|
}
|
|
|
|
pub fn into_reset_db(self) -> Result<SlasherDB<E>, Error> {
|
|
self.db.reset()?;
|
|
Ok(self.db)
|
|
}
|
|
|
|
/// Harvest all attester slashings found, removing them from the slasher.
|
|
pub fn get_attester_slashings(&self) -> HashSet<AttesterSlashing<E>> {
|
|
std::mem::take(&mut self.attester_slashings.lock())
|
|
}
|
|
|
|
/// Harvest all proposer slashings found, removing them from the slasher.
|
|
pub fn get_proposer_slashings(&self) -> HashSet<ProposerSlashing> {
|
|
std::mem::take(&mut self.proposer_slashings.lock())
|
|
}
|
|
|
|
pub fn config(&self) -> &Config {
|
|
&self.config
|
|
}
|
|
|
|
/// Return the number of attestations in the queue.
|
|
pub fn attestation_queue_len(&self) -> usize {
|
|
self.attestation_queue.len()
|
|
}
|
|
|
|
/// Accept an attestation from the network and queue it for processing.
|
|
pub fn accept_attestation(&self, attestation: IndexedAttestation<E>) {
|
|
self.attestation_queue.queue(attestation);
|
|
}
|
|
|
|
/// Accept a block from the network and queue it for processing.
|
|
pub fn accept_block_header(&self, block_header: SignedBeaconBlockHeader) {
|
|
self.block_queue.queue(block_header);
|
|
}
|
|
|
|
/// Apply queued blocks and attestations to the on-disk database, and detect slashings!
|
|
pub fn process_queued(&self, current_epoch: Epoch) -> Result<BatchStats, Error> {
|
|
let mut txn = self.db.begin_rw_txn()?;
|
|
let block_stats = self.process_blocks(&mut txn)?;
|
|
let attestation_stats = self.process_attestations(current_epoch, &mut txn)?;
|
|
txn.commit()?;
|
|
Ok(BatchStats {
|
|
block_stats,
|
|
attestation_stats,
|
|
})
|
|
}
|
|
|
|
/// Apply queued blocks to the on-disk database.
|
|
///
|
|
/// Return the number of blocks
|
|
pub fn process_blocks(&self, txn: &mut RwTransaction<'_>) -> Result<BlockStats, Error> {
|
|
let blocks = self.block_queue.dequeue();
|
|
let num_processed = blocks.len();
|
|
let mut slashings = vec![];
|
|
|
|
metrics::set_gauge(&SLASHER_NUM_BLOCKS_PROCESSED, blocks.len() as i64);
|
|
|
|
for block in blocks {
|
|
if let ProposerSlashingStatus::DoubleVote(slashing) =
|
|
self.db.check_or_insert_block_proposal(txn, block)?
|
|
{
|
|
slashings.push(*slashing);
|
|
}
|
|
}
|
|
|
|
let num_slashings = slashings.len();
|
|
if !slashings.is_empty() {
|
|
info!("Found {} new proposer slashings!", slashings.len());
|
|
self.proposer_slashings.lock().extend(slashings);
|
|
}
|
|
|
|
Ok(BlockStats {
|
|
num_processed,
|
|
num_slashings,
|
|
})
|
|
}
|
|
|
|
/// Apply queued attestations to the on-disk database.
|
|
pub fn process_attestations(
|
|
&self,
|
|
current_epoch: Epoch,
|
|
txn: &mut RwTransaction<'_>,
|
|
) -> Result<AttestationStats, Error> {
|
|
let snapshot = self.attestation_queue.dequeue();
|
|
let num_processed = snapshot.len();
|
|
|
|
// Filter attestations for relevance.
|
|
let (snapshot, deferred, num_dropped) = self.validate(snapshot, current_epoch);
|
|
let num_valid = snapshot.len();
|
|
let num_deferred = deferred.len();
|
|
self.attestation_queue.requeue(deferred);
|
|
|
|
debug!(
|
|
%num_valid,
|
|
num_deferred,
|
|
num_dropped,
|
|
"Pre-processing attestations for slasher"
|
|
);
|
|
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_VALID, num_valid as i64);
|
|
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DEFERRED, num_deferred as i64);
|
|
metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DROPPED, num_dropped as i64);
|
|
|
|
// De-duplicate attestations and sort by validator index.
|
|
let mut batch = AttestationBatch::default();
|
|
|
|
for indexed_record in snapshot {
|
|
batch.queue(indexed_record);
|
|
}
|
|
|
|
// Insert relevant attestations into database.
|
|
let mut num_stored = 0;
|
|
for weak_record in &batch.attestations {
|
|
if let Some(indexed_record) = weak_record.upgrade() {
|
|
let indexed_attestation_id = self.db.store_indexed_attestation(
|
|
txn,
|
|
indexed_record.record.indexed_attestation_hash,
|
|
&indexed_record.indexed,
|
|
)?;
|
|
indexed_record.set_id(indexed_attestation_id);
|
|
|
|
// Prime the attestation data root LRU cache.
|
|
self.db.cache_attestation_data_root(
|
|
IndexedAttestationId::new(indexed_attestation_id),
|
|
indexed_record.record.attestation_data_hash,
|
|
);
|
|
|
|
num_stored += 1;
|
|
}
|
|
}
|
|
|
|
debug!(num_stored, ?num_valid, "Stored attestations in slasher DB");
|
|
metrics::set_gauge(
|
|
&SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH,
|
|
num_stored as i64,
|
|
);
|
|
|
|
// Group attestations into chunked batches and process them.
|
|
let grouped_attestations = batch.group_by_validator_chunk_index(&self.config);
|
|
for (subqueue_id, subqueue) in grouped_attestations.subqueues.into_iter().enumerate() {
|
|
self.process_batch(txn, subqueue_id, subqueue, current_epoch)?;
|
|
}
|
|
|
|
metrics::set_gauge(
|
|
&metrics::SLASHER_ATTESTATION_ROOT_CACHE_SIZE,
|
|
self.db.attestation_root_cache_size() as i64,
|
|
);
|
|
|
|
Ok(AttestationStats { num_processed })
|
|
}
|
|
|
|
/// Process a batch of attestations for a range of validator indices.
|
|
fn process_batch(
|
|
&self,
|
|
txn: &mut RwTransaction<'_>,
|
|
subqueue_id: usize,
|
|
batch: SimpleBatch<E>,
|
|
current_epoch: Epoch,
|
|
) -> Result<(), Error> {
|
|
// First, check for double votes.
|
|
for attestation in &batch {
|
|
let indexed_attestation_id = IndexedAttestationId::new(attestation.get_id());
|
|
match self.check_double_votes(
|
|
txn,
|
|
subqueue_id,
|
|
&attestation.indexed,
|
|
&attestation.record,
|
|
indexed_attestation_id,
|
|
) {
|
|
Ok(slashings) => {
|
|
if !slashings.is_empty() {
|
|
info!("Found {} new double-vote slashings!", slashings.len());
|
|
}
|
|
self.attester_slashings.lock().extend(slashings);
|
|
}
|
|
Err(e) => {
|
|
error!(
|
|
error = ?e,
|
|
"Error checking for double votes"
|
|
);
|
|
return Err(e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Then check for surrounds using the min-max arrays.
|
|
match array::update(
|
|
&self.db,
|
|
txn,
|
|
subqueue_id,
|
|
batch,
|
|
current_epoch,
|
|
&self.config,
|
|
) {
|
|
Ok(slashings) => {
|
|
if !slashings.is_empty() {
|
|
info!("Found {} new surround slashings!", slashings.len());
|
|
}
|
|
self.attester_slashings.lock().extend(slashings);
|
|
}
|
|
Err(e) => {
|
|
error!(error = ?e, "Error processing array update");
|
|
return Err(e);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Check for double votes from all validators on `attestation` who match the `subqueue_id`.
|
|
fn check_double_votes(
|
|
&self,
|
|
txn: &mut RwTransaction<'_>,
|
|
subqueue_id: usize,
|
|
attestation: &IndexedAttestation<E>,
|
|
attester_record: &AttesterRecord,
|
|
indexed_attestation_id: IndexedAttestationId,
|
|
) -> Result<HashSet<AttesterSlashing<E>>, Error> {
|
|
let mut slashings = HashSet::new();
|
|
|
|
for validator_index in self
|
|
.config
|
|
.attesting_validators_in_chunk(attestation, subqueue_id)
|
|
{
|
|
let slashing_status = self.db.check_and_update_attester_record(
|
|
txn,
|
|
validator_index,
|
|
attestation,
|
|
attester_record,
|
|
indexed_attestation_id,
|
|
)?;
|
|
|
|
if let Some(slashing) = slashing_status.into_slashing(attestation) {
|
|
debug!(
|
|
validator_index,
|
|
epoch = %slashing.attestation_1().data().target.epoch,
|
|
"Found double-vote slashing"
|
|
);
|
|
slashings.insert(slashing);
|
|
}
|
|
}
|
|
|
|
Ok(slashings)
|
|
}
|
|
|
|
/// Validate the attestations in `batch` for ingestion during `current_epoch`.
|
|
///
|
|
/// Drop any attestations that are too old to ever be relevant, and return any attestations
|
|
/// that might be valid in the future.
|
|
///
|
|
/// Returns `(valid, deferred, num_dropped)`.
|
|
fn validate(
|
|
&self,
|
|
batch: SimpleBatch<E>,
|
|
current_epoch: Epoch,
|
|
) -> (SimpleBatch<E>, SimpleBatch<E>, usize) {
|
|
let mut keep = Vec::with_capacity(batch.len());
|
|
let mut defer = vec![];
|
|
let mut drop_count = 0;
|
|
|
|
for indexed_record in batch {
|
|
let attestation = &indexed_record.indexed;
|
|
let target_epoch = attestation.data().target.epoch;
|
|
let source_epoch = attestation.data().source.epoch;
|
|
|
|
if source_epoch > target_epoch
|
|
|| source_epoch + self.config.history_length as u64 <= current_epoch
|
|
{
|
|
drop_count += 1;
|
|
continue;
|
|
}
|
|
|
|
// Check that the attestation's target epoch is acceptable, and defer it
|
|
// if it's not.
|
|
if target_epoch > current_epoch {
|
|
defer.push(indexed_record);
|
|
} else {
|
|
// Otherwise the attestation is OK to process.
|
|
keep.push(indexed_record);
|
|
}
|
|
}
|
|
|
|
(keep, defer, drop_count)
|
|
}
|
|
|
|
/// Prune unnecessary attestations and blocks from the on-disk database.
|
|
pub fn prune_database(&self, current_epoch: Epoch) -> Result<(), Error> {
|
|
self.db.prune(current_epoch)
|
|
}
|
|
}
|