From 57dfcfd83ab9f4695eecc7e65164c6f4a97d45fd Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sun, 5 Mar 2023 23:43:31 +0000 Subject: [PATCH] Optimise attestation selection proof signing (#4033) ## Issue Addressed Closes #3963 (hopefully) ## Proposed Changes Compute attestation selection proofs gradually each slot rather than in a single `join_all` at the start of each epoch. On a machine with 5k validators this replaces 5k tasks signing 5k proofs with 1 task that signs 5k/32 ~= 160 proofs each slot. Based on testing with Goerli validators this seems to reduce the average time to produce a signature by preventing Tokio and the OS from falling over each other trying to run hundreds of threads. My testing so far has been with local keystores, which run on a dynamic pool of up to 512 OS threads because they use [`spawn_blocking`](https://docs.rs/tokio/1.11.0/tokio/task/fn.spawn_blocking.html) (and we haven't changed the default). An earlier version of this PR hyper-optimised the time-per-signature metric to the detriment of the entire system's performance (see the reverted commits). The current PR is conservative in that it avoids touching the attestation service at all. I think there's more optimising to do here, but we can come back for that in a future PR rather than expanding the scope of this one. The new algorithm for attestation selection proofs is: - We sign a small batch of selection proofs each slot, for slots up to 8 slots in the future. On average we'll sign one slot's worth of proofs per slot, with an 8 slot lookahead. - The batch is signed halfway through the slot when there is unlikely to be contention for signature production (blocks are <4s, attestations are ~4-6 seconds, aggregates are 8s+). ## Performance Data _See first comment for updated graphs_. Graph of median signing times before this PR: ![signing_times_median](https://user-images.githubusercontent.com/4452260/221495627-3ab3c105-319f-406e-b99d-b5913e0ded9c.png) Graph of update attesters metric (includes selection proof signing) before this PR: ![update_attesters_store](https://user-images.githubusercontent.com/4452260/221497057-01ba40e4-8148-45f6-9e21-36a9567a631a.png) Median signing time after this PR (prototype from 12:00, updated version from 13:30): ![signing_times_median_updated](https://user-images.githubusercontent.com/4452260/221771578-47a040cc-b832-482f-9a1a-d1bd9854e00e.png) 99th percentile on signing times (bounded attestation signing from 16:55, now removed): ![signing_times_99pc](https://user-images.githubusercontent.com/4452260/221772055-e64081a8-2220-45ba-ba6d-9d7e344a5bde.png) Attester map update timing after this PR: ![update_attesters_store_updated](https://user-images.githubusercontent.com/4452260/221771757-c8558a48-7f4e-4bb5-9929-dee177a66c1e.png) Selection proof signings per second change: ![signing_attempts](https://user-images.githubusercontent.com/4452260/221771855-64f5da22-1655-478d-926b-810be8a3650c.png) ## Link to late blocks I believe this is related to the slow block signings because logs from Stakely in #3963 show these two logs almost 5 seconds apart: > Feb 23 18:56:23.978 INFO Received unsigned block, slot: 5862880, service: block, module: validator_client::block_service:393 > Feb 23 18:56:28.552 INFO Publishing signed block, slot: 5862880, service: block, module: validator_client::block_service:416 The only thing that happens between those two logs is the signing of the block: https://github.com/sigp/lighthouse/blob/0fb58a680d6f0c9f0dc8beecf142186debff9a8d/validator_client/src/block_service.rs#L410-L414 Helpfully, Stakely noticed this issue without any Lighthouse BNs in the mix, which pointed to a clear issue in the VC. ## TODO - [x] Further testing on testnet infrastructure. - [x] Make the attestation signing parallelism configurable. --- validator_client/src/block_service.rs | 5 + validator_client/src/duties_service.rs | 189 ++++++++++++++++--- validator_client/src/http_metrics/metrics.rs | 5 + 3 files changed, 173 insertions(+), 26 deletions(-) diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 6fd519ebaf..3b37492377 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -407,17 +407,22 @@ impl BlockService { ) .await?; + let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES); let signed_block = self_ref .validator_store .sign_block::(*validator_pubkey_ref, block, current_slot) .await .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?; + let signing_time_ms = + Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); info!( log, "Publishing signed block"; "slot" => slot.as_u64(), + "signing_time_ms" => signing_time_ms, ); + // Publish block with first available beacon node. self.beacon_nodes .first_success( diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 6ba2a2d1fd..c335c67ab1 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -17,13 +17,14 @@ use crate::{ }; use environment::RuntimeContext; use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; -use futures::future::join_all; +use futures::{stream, StreamExt}; use parking_lot::RwLock; use safe_arith::ArithError; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use sync::poll_sync_committee_duties; use sync::SyncDutiesMap; use tokio::{sync::mpsc::Sender, time::sleep}; @@ -40,6 +41,14 @@ const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2; /// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. const HISTORICAL_DUTIES_EPOCHS: u64 = 2; +/// Compute attestation selection proofs this many slots before they are required. +/// +/// At start-up selection proofs will be computed with less lookahead out of necessity. +const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8; + +/// Fraction of a slot at which selection proof signing should happen (2 means half way). +const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; + /// Minimum number of validators for which we auto-enable per-validator metrics. /// For validators greater than this value, we need to manually set the `enable-per-validator-metrics` /// flag in the cli to enable collection of per validator metrics. @@ -71,7 +80,7 @@ pub struct DutyAndProof { impl DutyAndProof { /// Instantiate `Self`, computing the selection proof as well. - pub async fn new( + pub async fn new_with_selection_proof( duty: AttesterData, validator_store: &ValidatorStore, spec: &ChainSpec, @@ -99,6 +108,14 @@ impl DutyAndProof { selection_proof, }) } + + /// Create a new `DutyAndProof` with the selection proof waiting to be filled in. + pub fn new_without_selection_proof(duty: AttesterData) -> Self { + Self { + duty, + selection_proof: None, + } + } } /// To assist with readability, the dependent root for attester/proposer duties. @@ -471,7 +488,7 @@ async fn poll_validator_indices( /// 3. Push out any attestation subnet subscriptions to the BN. /// 4. Prune old entries from `duties_service.attesters`. async fn poll_beacon_attesters( - duties_service: &DutiesService, + duties_service: &Arc>, ) -> Result<(), Error> { let current_epoch_timer = metrics::start_timer_vec( &metrics::DUTIES_SERVICE_TIMES, @@ -634,7 +651,7 @@ async fn poll_beacon_attesters( /// For the given `local_indices` and `local_pubkeys`, download the duties for the given `epoch` and /// store them in `duties_service.attesters`. async fn poll_beacon_attesters_for_epoch( - duties_service: &DutiesService, + duties_service: &Arc>, epoch: Epoch, local_indices: &[u64], local_pubkeys: &HashSet, @@ -742,31 +759,16 @@ async fn poll_beacon_attesters_for_epoch( "num_new_duties" => new_duties.len(), ); - // Produce the `DutyAndProof` messages in parallel. - let duty_and_proof_results = join_all(new_duties.into_iter().map(|duty| { - DutyAndProof::new(duty, &duties_service.validator_store, &duties_service.spec) - })) - .await; - // Update the duties service with the new `DutyAndProof` messages. let mut attesters = duties_service.attesters.write(); let mut already_warned = Some(()); - for result in duty_and_proof_results { - let duty_and_proof = match result { - Ok(duty_and_proof) => duty_and_proof, - Err(e) => { - error!( - log, - "Failed to produce duty and proof"; - "error" => ?e, - "msg" => "may impair attestation duties" - ); - // Do not abort the entire batch for a single failure. - continue; - } - }; + for duty in &new_duties { + let attester_map = attesters.entry(duty.pubkey).or_default(); - let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default(); + // Create initial entries in the map without selection proofs. We'll compute them in the + // background later to avoid creating a thundering herd of signing threads whenever new + // duties are computed. + let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone()); if let Some((prior_dependent_root, _)) = attester_map.insert(epoch, (dependent_root, duty_and_proof)) @@ -785,9 +787,144 @@ async fn poll_beacon_attesters_for_epoch( } drop(attesters); + // Spawn the background task to compute selection proofs. + let subservice = duties_service.clone(); + duties_service.context.executor.spawn( + async move { + fill_in_selection_proofs(subservice, new_duties, dependent_root).await; + }, + "duties_service_selection_proofs_background", + ); + Ok(()) } +/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. +/// +/// Duties are computed in batches each slot. If a re-org is detected then the process will +/// terminate early as it is assumed the selection proofs from `duties` are no longer relevant. +async fn fill_in_selection_proofs( + duties_service: Arc>, + duties: Vec, + dependent_root: Hash256, +) { + let log = duties_service.context.log(); + + // Sort duties by slot in a BTreeMap. + let mut duties_by_slot: BTreeMap> = BTreeMap::new(); + + for duty in duties { + duties_by_slot.entry(duty.slot).or_default().push(duty); + } + + // At halfway through each slot when nothing else is likely to be getting signed, sign a batch + // of selection proofs and insert them into the duties service `attesters` map. + let slot_clock = &duties_service.slot_clock; + let slot_offset = duties_service.slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM; + + while !duties_by_slot.is_empty() { + if let Some(duration) = slot_clock.duration_to_next_slot() { + sleep(duration.saturating_sub(slot_offset)).await; + + let Some(current_slot) = slot_clock.now() else { + continue; + }; + + let lookahead_slot = current_slot + SELECTION_PROOF_SLOT_LOOKAHEAD; + + let mut relevant_duties = duties_by_slot.split_off(&lookahead_slot); + std::mem::swap(&mut relevant_duties, &mut duties_by_slot); + + let batch_size = relevant_duties.values().map(Vec::len).sum::(); + + if batch_size == 0 { + continue; + } + + let timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::ATTESTATION_SELECTION_PROOFS], + ); + + // Sign selection proofs (serially). + let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten()) + .then(|duty| async { + DutyAndProof::new_with_selection_proof( + duty, + &duties_service.validator_store, + &duties_service.spec, + ) + .await + }) + .collect::>() + .await; + + // Add to attesters store. + let mut attesters = duties_service.attesters.write(); + for result in duty_and_proof_results { + let duty_and_proof = match result { + Ok(duty_and_proof) => duty_and_proof, + Err(e) => { + error!( + log, + "Failed to produce duty and proof"; + "error" => ?e, + "msg" => "may impair attestation duties" + ); + // Do not abort the entire batch for a single failure. + continue; + } + }; + + let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default(); + let epoch = duty_and_proof.duty.slot.epoch(E::slots_per_epoch()); + match attester_map.entry(epoch) { + hash_map::Entry::Occupied(mut entry) => { + // No need to update duties for which no proof was computed. + let Some(selection_proof) = duty_and_proof.selection_proof else { + continue; + }; + + let (existing_dependent_root, existing_duty) = entry.get_mut(); + + if *existing_dependent_root == dependent_root { + // Replace existing proof. + existing_duty.selection_proof = Some(selection_proof); + } else { + // Our selection proofs are no longer relevant due to a reorg, abandon + // this entire background process. + debug!( + log, + "Stopping selection proof background task"; + "reason" => "re-org" + ); + return; + } + } + hash_map::Entry::Vacant(entry) => { + entry.insert((dependent_root, duty_and_proof)); + } + } + } + drop(attesters); + + let time_taken_ms = + Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); + debug!( + log, + "Computed attestation selection proofs"; + "batch_size" => batch_size, + "lookahead_slot" => lookahead_slot, + "time_taken_ms" => time_taken_ms + ); + } else { + // Just sleep for one slot if we are unable to read the system clock, this gives + // us an opportunity for the clock to eventually come good. + sleep(duties_service.slot_clock.slot_duration()).await; + } + } +} + /// Download the proposer duties for the current epoch and store them in `duties_service.proposers`. /// If there are any proposer for this slot, send out a notification to the block proposers. /// diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 9b60d0edec..c9ad31feb4 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -32,6 +32,7 @@ pub const PROPOSER_DUTIES_HTTP_GET: &str = "proposer_duties_http_get"; pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get"; pub const SUBSCRIPTIONS_HTTP_POST: &str = "subscriptions_http_post"; pub const UPDATE_PROPOSERS: &str = "update_proposers"; +pub const ATTESTATION_SELECTION_PROOFS: &str = "attestation_selection_proofs"; pub const SUBSCRIPTIONS: &str = "subscriptions"; pub const LOCAL_KEYSTORE: &str = "local_keystore"; pub const WEB3SIGNER: &str = "web3signer"; @@ -172,6 +173,10 @@ lazy_static::lazy_static! { "Duration to obtain a signature", &["type"] ); + pub static ref BLOCK_SIGNING_TIMES: Result = try_create_histogram( + "vc_block_signing_times_seconds", + "Duration to obtain a signature for a block", + ); pub static ref ATTESTATION_DUTY: Result = try_create_int_gauge_vec( "vc_attestation_duty_slot",