diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index b90410aaca..c756b8a6cb 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -451,6 +451,18 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .requires("slasher") .takes_value(true) ) + .arg( + Arg::with_name("slasher-slot-offset") + .long("slasher-slot-offset") + .help( + "Set the delay from the start of the slot at which the slasher should ingest \ + attestations. Only effective if the slasher-update-period is a multiple of the \ + slot duration." + ) + .value_name("SECONDS") + .requires("slasher") + .takes_value(true) + ) .arg( Arg::with_name("slasher-history-length") .long("slasher-history-length") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 12c7d59b5d..52a0932615 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -448,6 +448,19 @@ pub fn get_config( slasher_config.update_period = update_period; } + if let Some(slot_offset) = + clap_utils::parse_optional::(cli_args, "slasher-slot-offset")? + { + if slot_offset.is_finite() { + slasher_config.slot_offset = slot_offset; + } else { + return Err(format!( + "invalid float for slasher-slot-offset: {}", + slot_offset + )); + } + } + if let Some(history_length) = clap_utils::parse_optional(cli_args, "slasher-history-length")? { diff --git a/book/src/slasher.md b/book/src/slasher.md index d4d70567c5..126573c556 100644 --- a/book/src/slasher.md +++ b/book/src/slasher.md @@ -102,6 +102,31 @@ If the `time_taken` is substantially longer than the update period then it indic struggling under the load, and you should consider increasing the update period or lowering the resource requirements by tweaking the history length. +The update period should almost always be set to a multiple of the slot duration (12 +seconds), or in rare cases a divisor (e.g. 4 seconds). + +### Slot Offset + +* Flag: `--slasher-slot-offset SECONDS` +* Argument: number of seconds (decimal allowed) +* Default: 10.5 seconds + +Set the offset from the start of the slot at which slasher processing should run. The default +value of 10.5 seconds is chosen so that de-duplication can be maximally effective. The slasher +will de-duplicate attestations from the same batch by storing only the attestations necessary +to cover all seen validators. In other words, it will store aggregated attestations rather than +unaggregated attestations if given the opportunity. + +Aggregated attestations are published 8 seconds into the slot, so the default allows 2.5 seconds for +them to arrive, and 1.5 seconds for them to be processed before a potential block proposal at the +start of the next slot. If the batch processing time on your machine is significantly longer than +1.5 seconds then you may want to lengthen the update period to 24 seconds, or decrease the slot +offset to a value in the range 8.5-10.5s (lower values may result in more data being stored). + +The slasher will run every `update-period` seconds after the first `slot_start + slot-offset`, which +means the `slot-offset` will be ineffective if the `update-period` is not a multiple (or divisor) of +the slot duration. + ### Chunk Size and Validator Chunk Size * Flags: `--slasher-chunk-size EPOCHS`, `--slasher-validator-chunk-size NUM_VALIDATORS` diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index a7ccdbebd3..290fc7aef3 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -795,6 +795,25 @@ fn slasher_update_period_flag() { }); } #[test] +fn slasher_slot_offset() { + // TODO: check that the offset is actually stored, once the config is un-hacked + // See: https://github.com/sigp/lighthouse/pull/2767#discussion_r741610402 + CommandLineTest::new() + .flag("slasher", None) + .flag("slasher-max-db-size", Some("16")) + .flag("slasher-slot-offset", Some("11.25")) + .run(); +} +#[test] +#[should_panic] +fn slasher_slot_offset_nan() { + CommandLineTest::new() + .flag("slasher", None) + .flag("slasher-max-db-size", Some("16")) + .flag("slasher-slot-offset", Some("NaN")) + .run(); +} +#[test] fn slasher_history_length_flag() { CommandLineTest::new() .flag("slasher", None) diff --git a/slasher/service/src/service.rs b/slasher/service/src/service.rs index 5cfd3f6672..510ed6cd98 100644 --- a/slasher/service/src/service.rs +++ b/slasher/service/src/service.rs @@ -55,11 +55,18 @@ impl SlasherService { // don't need to burden them with more work (we can wait). let (notif_sender, notif_receiver) = sync_channel(1); let update_period = slasher.config().update_period; + let slot_offset = slasher.config().slot_offset; let beacon_chain = self.beacon_chain.clone(); let network_sender = self.network_sender.clone(); executor.spawn( - Self::run_notifier(beacon_chain.clone(), update_period, notif_sender, log), + Self::run_notifier( + beacon_chain.clone(), + update_period, + slot_offset, + notif_sender, + log, + ), "slasher_server_notifier", ); @@ -75,12 +82,19 @@ impl SlasherService { async fn run_notifier( beacon_chain: Arc>, update_period: u64, + slot_offset: f64, notif_sender: SyncSender, log: Logger, ) { - // NOTE: could align each run to some fixed point in each slot, see: - // https://github.com/sigp/lighthouse/issues/1861 - let mut interval = interval_at(Instant::now(), Duration::from_secs(update_period)); + let slot_offset = Duration::from_secs_f64(slot_offset); + let start_instant = + if let Some(duration_to_next_slot) = beacon_chain.slot_clock.duration_to_next_slot() { + Instant::now() + duration_to_next_slot + slot_offset + } else { + error!(log, "Error aligning slasher to slot clock"); + Instant::now() + }; + let mut interval = interval_at(start_instant, Duration::from_secs(update_period)); loop { interval.tick().await; diff --git a/slasher/src/array.rs b/slasher/src/array.rs index 4ef8de7f42..545c0b7e6f 100644 --- a/slasher/src/array.rs +++ b/slasher/src/array.rs @@ -1,5 +1,5 @@ use crate::metrics::{self, SLASHER_COMPRESSION_RATIO, SLASHER_NUM_CHUNKS_UPDATED}; -use crate::{AttesterRecord, AttesterSlashingStatus, Config, Error, SlasherDB}; +use crate::{AttesterSlashingStatus, Config, Error, IndexedAttesterRecord, SlasherDB}; use flate2::bufread::{ZlibDecoder, ZlibEncoder}; use lmdb::{RwTransaction, Transaction}; use serde_derive::{Deserialize, Serialize}; @@ -486,7 +486,7 @@ pub fn update( db: &SlasherDB, txn: &mut RwTransaction<'_>, validator_chunk_index: usize, - batch: Vec, AttesterRecord)>>, + batch: Vec>>, current_epoch: Epoch, config: &Config, ) -> Result>, Error> { @@ -496,7 +496,7 @@ pub fn update( let mut chunk_attestations = BTreeMap::new(); for attestation in batch { chunk_attestations - .entry(config.chunk_index(attestation.0.data.source.epoch)) + .entry(config.chunk_index(attestation.indexed.data.source.epoch)) .or_insert_with(Vec::new) .push(attestation); } @@ -573,7 +573,7 @@ pub fn update_array( db: &SlasherDB, txn: &mut RwTransaction<'_>, validator_chunk_index: usize, - chunk_attestations: &BTreeMap, AttesterRecord)>>>, + chunk_attestations: &BTreeMap>>>, current_epoch: Epoch, config: &Config, ) -> Result>, Error> { @@ -597,7 +597,7 @@ pub fn update_array( for attestations in chunk_attestations.values() { for attestation in attestations { for validator_index in - config.attesting_validators_in_chunk(&attestation.0, validator_chunk_index) + config.attesting_validators_in_chunk(&attestation.indexed, validator_chunk_index) { let slashing_status = apply_attestation_for_validator::( db, @@ -605,11 +605,11 @@ pub fn update_array( &mut updated_chunks, validator_chunk_index, validator_index, - &attestation.0, + &attestation.indexed, current_epoch, config, )?; - if let Some(slashing) = slashing_status.into_slashing(&attestation.0) { + if let Some(slashing) = slashing_status.into_slashing(&attestation.indexed) { slashings.insert(slashing); } } diff --git a/slasher/src/attestation_queue.rs b/slasher/src/attestation_queue.rs index 70ea2ea13e..3d23932df9 100644 --- a/slasher/src/attestation_queue.rs +++ b/slasher/src/attestation_queue.rs @@ -1,64 +1,85 @@ -use crate::{AttesterRecord, Config}; +use crate::{AttesterRecord, Config, IndexedAttesterRecord}; use parking_lot::Mutex; -use std::collections::BTreeSet; -use std::sync::Arc; -use types::{EthSpec, IndexedAttestation}; +use std::collections::BTreeMap; +use std::sync::{Arc, Weak}; +use types::{EthSpec, Hash256, IndexedAttestation}; /// Staging area for attestations received from the network. /// -/// To be added to the database in batches, for efficiency and to prevent data races. +/// Attestations are not grouped by validator index at this stage so that they can be easily +/// filtered for timeliness. #[derive(Debug, Default)] pub struct AttestationQueue { - /// All attestations (unique) for storage on disk. - pub queue: Mutex>, + pub queue: Mutex>, +} + +pub type SimpleBatch = Vec>>; + +/// Attestations dequeued from the queue and in preparation for processing. +/// +/// This struct is responsible for mapping validator indices to attestations and performing +/// de-duplication to remove redundant attestations. +#[derive(Debug, Default)] +pub struct AttestationBatch { + /// Map from (`validator_index`, `attestation_data_hash`) to indexed attester record. + /// + /// This mapping is used for de-duplication, see: + /// + /// https://github.com/sigp/lighthouse/issues/2112 + pub attesters: BTreeMap<(u64, Hash256), Arc>>, + + /// Vec of all unique indexed attester records. + /// + /// The weak references account for the fact that some records might prove useless after + /// de-duplication. + pub attestations: Vec>>, } /// Attestations grouped by validator index range. #[derive(Debug)] pub struct GroupedAttestations { - pub subqueues: Vec>, -} - -/// A queue of attestations for a range of validator indices. -#[derive(Debug, Default)] -pub struct AttestationBatch { - pub attestations: Vec, AttesterRecord)>>, + pub subqueues: Vec>, } impl AttestationBatch { - pub fn len(&self) -> usize { - self.attestations.len() + /// Add an attestation to the queue. + pub fn queue(&mut self, indexed_record: Arc>) { + self.attestations.push(Arc::downgrade(&indexed_record)); + + let attestation_data_hash = indexed_record.record.attestation_data_hash; + for &validator_index in &indexed_record.indexed.attesting_indices { + self.attesters + .entry((validator_index, attestation_data_hash)) + .and_modify(|existing_entry| { + // If the new record is for the same attestation data but with more bits set + // then replace the existing record so that we might avoid storing the + // smaller indexed attestation. Single-bit attestations will usually be removed + // completely by this process, and aggregates will only be retained if they + // are not redundant with respect to a larger aggregate seen in the same batch. + if existing_entry.indexed.attesting_indices.len() + < indexed_record.indexed.attesting_indices.len() + { + *existing_entry = indexed_record.clone(); + } + }) + .or_insert_with(|| indexed_record.clone()); + } } - pub fn is_empty(&self) -> bool { - self.attestations.is_empty() - } - - /// Group the attestations by validator index. - pub fn group_by_validator_index(self, config: &Config) -> GroupedAttestations { + /// Group the attestations by validator chunk index. + pub fn group_by_validator_chunk_index(self, config: &Config) -> GroupedAttestations { let mut grouped_attestations = GroupedAttestations { subqueues: vec![] }; - for attestation in self.attestations { - let subqueue_ids = attestation - .0 - .attesting_indices - .iter() - .map(|validator_index| config.validator_chunk_index(*validator_index)) - .collect::>(); + for ((validator_index, _), indexed_record) in self.attesters { + let subqueue_id = config.validator_chunk_index(validator_index); - if let Some(max_subqueue_id) = subqueue_ids.iter().next_back() { - if *max_subqueue_id >= grouped_attestations.subqueues.len() { - grouped_attestations - .subqueues - .resize_with(max_subqueue_id + 1, AttestationBatch::default); - } + if subqueue_id >= grouped_attestations.subqueues.len() { + grouped_attestations + .subqueues + .resize_with(subqueue_id + 1, SimpleBatch::default); } - for subqueue_id in subqueue_ids { - grouped_attestations.subqueues[subqueue_id] - .attestations - .push(attestation.clone()); - } + grouped_attestations.subqueues[subqueue_id].push(indexed_record); } grouped_attestations @@ -66,21 +87,18 @@ impl AttestationBatch { } impl AttestationQueue { - /// Add an attestation to the queue. pub fn queue(&self, attestation: IndexedAttestation) { let attester_record = AttesterRecord::from(attestation.clone()); - self.queue - .lock() - .attestations - .push(Arc::new((attestation, attester_record))); + let indexed_record = IndexedAttesterRecord::new(attestation, attester_record); + self.queue.lock().push(indexed_record); } - pub fn dequeue(&self) -> AttestationBatch { + pub fn dequeue(&self) -> SimpleBatch { std::mem::take(&mut self.queue.lock()) } - pub fn requeue(&self, batch: AttestationBatch) { - self.queue.lock().attestations.extend(batch.attestations); + pub fn requeue(&self, batch: SimpleBatch) { + self.queue.lock().extend(batch); } pub fn len(&self) -> usize { diff --git a/slasher/src/attester_record.rs b/slasher/src/attester_record.rs index 82d5dc46f9..310118e1ae 100644 --- a/slasher/src/attester_record.rs +++ b/slasher/src/attester_record.rs @@ -1,4 +1,5 @@ use ssz_derive::{Decode, Encode}; +use std::sync::Arc; use tree_hash::TreeHash as _; use tree_hash_derive::TreeHash; use types::{AggregateSignature, EthSpec, Hash256, IndexedAttestation, VariableList}; @@ -11,6 +12,21 @@ pub struct AttesterRecord { pub indexed_attestation_hash: Hash256, } +/// Bundling of an `IndexedAttestation` with an `AttesterRecord`. +/// +/// This struct gets `Arc`d and passed around between each stage of queueing and processing. +#[derive(Debug)] +pub struct IndexedAttesterRecord { + pub indexed: IndexedAttestation, + pub record: AttesterRecord, +} + +impl IndexedAttesterRecord { + pub fn new(indexed: IndexedAttestation, record: AttesterRecord) -> Arc { + Arc::new(IndexedAttesterRecord { indexed, record }) + } +} + #[derive(Debug, Clone, Encode, Decode, TreeHash)] struct IndexedAttestationHeader { pub attesting_indices: VariableList, diff --git a/slasher/src/config.rs b/slasher/src/config.rs index 5a701048bd..f8fcc1c02b 100644 --- a/slasher/src/config.rs +++ b/slasher/src/config.rs @@ -7,6 +7,7 @@ pub const DEFAULT_CHUNK_SIZE: usize = 16; pub const DEFAULT_VALIDATOR_CHUNK_SIZE: usize = 256; pub const DEFAULT_HISTORY_LENGTH: usize = 4096; pub const DEFAULT_UPDATE_PERIOD: u64 = 12; +pub const DEFAULT_SLOT_OFFSET: f64 = 10.5; pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB pub const DEFAULT_BROADCAST: bool = false; @@ -26,12 +27,19 @@ pub struct Config { pub history_length: usize, /// Update frequency in seconds. pub update_period: u64, + /// Offset from the start of the slot to begin processing. + #[serde(skip, default = "default_slot_offset")] + pub slot_offset: f64, /// Maximum size of the LMDB database in megabytes. pub max_db_size_mbs: usize, /// Whether to broadcast slashings found to the network. pub broadcast: bool, } +fn default_slot_offset() -> f64 { + DEFAULT_SLOT_OFFSET +} + impl Config { pub fn new(database_path: PathBuf) -> Self { Self { @@ -40,6 +48,7 @@ impl Config { validator_chunk_size: DEFAULT_VALIDATOR_CHUNK_SIZE, history_length: DEFAULT_HISTORY_LENGTH, update_period: DEFAULT_UPDATE_PERIOD, + slot_offset: DEFAULT_SLOT_OFFSET, max_db_size_mbs: DEFAULT_MAX_DB_SIZE, broadcast: DEFAULT_BROADCAST, } diff --git a/slasher/src/lib.rs b/slasher/src/lib.rs index d61c58c8cb..10427ba2f0 100644 --- a/slasher/src/lib.rs +++ b/slasher/src/lib.rs @@ -15,8 +15,8 @@ pub mod test_utils; mod utils; pub use crate::slasher::Slasher; -pub use attestation_queue::{AttestationBatch, AttestationQueue}; -pub use attester_record::AttesterRecord; +pub use attestation_queue::{AttestationBatch, AttestationQueue, SimpleBatch}; +pub use attester_record::{AttesterRecord, IndexedAttesterRecord}; pub use block_queue::BlockQueue; pub use config::Config; pub use database::SlasherDB; diff --git a/slasher/src/metrics.rs b/slasher/src/metrics.rs index 7f95ad8cca..6b21fb013a 100644 --- a/slasher/src/metrics.rs +++ b/slasher/src/metrics.rs @@ -22,6 +22,11 @@ lazy_static! { "slasher_num_attestations_valid", "Number of valid attestations per batch" ); + pub static ref SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH: Result = + try_create_int_gauge( + "slasher_num_attestations_stored_per_batch", + "Number of attestations stored per batch" + ); pub static ref SLASHER_NUM_BLOCKS_PROCESSED: Result = try_create_int_gauge( "slasher_num_blocks_processed", "Number of blocks processed per batch", diff --git a/slasher/src/migrate.rs b/slasher/src/migrate.rs index 1dfca9f69a..020c7aaf9a 100644 --- a/slasher/src/migrate.rs +++ b/slasher/src/migrate.rs @@ -1,4 +1,8 @@ -use crate::{database::CURRENT_SCHEMA_VERSION, Config, Error, SlasherDB}; +use crate::{ + config::{DEFAULT_BROADCAST, DEFAULT_SLOT_OFFSET}, + database::CURRENT_SCHEMA_VERSION, + Config, Error, SlasherDB, +}; use lmdb::RwTransaction; use serde_derive::{Deserialize, Serialize}; use std::path::PathBuf; @@ -25,8 +29,9 @@ impl Into for ConfigV1 { validator_chunk_size: self.validator_chunk_size, history_length: self.history_length, update_period: self.update_period, + slot_offset: DEFAULT_SLOT_OFFSET, max_db_size_mbs: self.max_db_size_mbs, - broadcast: false, + broadcast: DEFAULT_BROADCAST, } } } diff --git a/slasher/src/slasher.rs b/slasher/src/slasher.rs index d4e3bf4ca9..122ed439a4 100644 --- a/slasher/src/slasher.rs +++ b/slasher/src/slasher.rs @@ -1,11 +1,12 @@ use crate::batch_stats::{AttestationStats, BatchStats, BlockStats}; use crate::metrics::{ self, SLASHER_NUM_ATTESTATIONS_DEFERRED, SLASHER_NUM_ATTESTATIONS_DROPPED, - SLASHER_NUM_ATTESTATIONS_VALID, SLASHER_NUM_BLOCKS_PROCESSED, + SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH, SLASHER_NUM_ATTESTATIONS_VALID, + SLASHER_NUM_BLOCKS_PROCESSED, }; use crate::{ array, AttestationBatch, AttestationQueue, AttesterRecord, BlockQueue, Config, Error, - ProposerSlashingStatus, SlasherDB, + ProposerSlashingStatus, SimpleBatch, SlasherDB, }; use lmdb::{RwTransaction, Transaction}; use parking_lot::Mutex; @@ -132,33 +133,56 @@ impl Slasher { // Filter attestations for relevance. let (snapshot, deferred, num_dropped) = self.validate(snapshot, current_epoch); + let num_valid = snapshot.len(); let num_deferred = deferred.len(); self.attestation_queue.requeue(deferred); - // Insert attestations into database. debug!( self.log, - "Storing attestations in slasher DB"; - "num_valid" => snapshot.len(), + "Pre-processing attestations for slasher"; + "num_valid" => num_valid, "num_deferred" => num_deferred, "num_dropped" => num_dropped, ); - metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_VALID, snapshot.len() as i64); + metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_VALID, num_valid as i64); metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DEFERRED, num_deferred as i64); metrics::set_gauge(&SLASHER_NUM_ATTESTATIONS_DROPPED, num_dropped as i64); - for attestation in snapshot.attestations.iter() { - self.db.store_indexed_attestation( - txn, - attestation.1.indexed_attestation_hash, - &attestation.0, - )?; + // De-duplicate attestations and sort by validator index. + let mut batch = AttestationBatch::default(); + + for indexed_record in snapshot { + batch.queue(indexed_record); } - // Group attestations into batches and process them. - let grouped_attestations = snapshot.group_by_validator_index(&self.config); + // Insert relevant attestations into database. + let mut num_stored = 0; + for weak_record in &batch.attestations { + if let Some(indexed_record) = weak_record.upgrade() { + self.db.store_indexed_attestation( + txn, + indexed_record.record.indexed_attestation_hash, + &indexed_record.indexed, + )?; + num_stored += 1; + } + } + + debug!( + self.log, + "Stored attestations in slasher DB"; + "num_stored" => num_stored, + "num_valid" => num_valid, + ); + metrics::set_gauge( + &SLASHER_NUM_ATTESTATIONS_STORED_PER_BATCH, + num_stored as i64, + ); + + // Group attestations into chunked batches and process them. + let grouped_attestations = batch.group_by_validator_chunk_index(&self.config); for (subqueue_id, subqueue) in grouped_attestations.subqueues.into_iter().enumerate() { - self.process_batch(txn, subqueue_id, subqueue.attestations, current_epoch)?; + self.process_batch(txn, subqueue_id, subqueue, current_epoch)?; } Ok(AttestationStats { num_processed }) } @@ -168,12 +192,17 @@ impl Slasher { &self, txn: &mut RwTransaction<'_>, subqueue_id: usize, - batch: Vec, AttesterRecord)>>, + batch: SimpleBatch, current_epoch: Epoch, ) -> Result<(), Error> { // First, check for double votes. for attestation in &batch { - match self.check_double_votes(txn, subqueue_id, &attestation.0, attestation.1) { + match self.check_double_votes( + txn, + subqueue_id, + &attestation.indexed, + attestation.record, + ) { Ok(slashings) => { if !slashings.is_empty() { info!( @@ -270,15 +299,15 @@ impl Slasher { /// Returns `(valid, deferred, num_dropped)`. fn validate( &self, - batch: AttestationBatch, + batch: SimpleBatch, current_epoch: Epoch, - ) -> (AttestationBatch, AttestationBatch, usize) { + ) -> (SimpleBatch, SimpleBatch, usize) { let mut keep = Vec::with_capacity(batch.len()); let mut defer = vec![]; let mut drop_count = 0; - for tuple in batch.attestations.into_iter() { - let attestation = &tuple.0; + for indexed_record in batch { + let attestation = &indexed_record.indexed; let target_epoch = attestation.data.target.epoch; let source_epoch = attestation.data.source.epoch; @@ -292,20 +321,14 @@ impl Slasher { // Check that the attestation's target epoch is acceptable, and defer it // if it's not. if target_epoch > current_epoch { - defer.push(tuple); + defer.push(indexed_record); } else { // Otherwise the attestation is OK to process. - keep.push(tuple); + keep.push(indexed_record); } } - ( - AttestationBatch { attestations: keep }, - AttestationBatch { - attestations: defer, - }, - drop_count, - ) + (keep, defer, drop_count) } /// Prune unnecessary attestations and blocks from the on-disk database.