From 2cd3e3a768688748d8a55dcb91bab7f40a0c71e3 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 16 Sep 2022 08:54:03 +0000 Subject: [PATCH 01/10] Avoid duplicate committee cache loads (#3574) ## Issue Addressed NA ## Proposed Changes I have observed scenarios on Goerli where Lighthouse was receiving attestations which reference the same, un-cached shuffling on multiple threads at the same time. Lighthouse was then loading the same state from database and determining the shuffling on multiple threads at the same time. This is unnecessary load on the disk and RAM. This PR modifies the shuffling cache so that each entry can be either: - A committee - A promise for a committee (i.e., a `crossbeam_channel::Receiver`) Now, in the scenario where we have thread A and thread B simultaneously requesting the same un-cached shuffling, we will have the following: 1. Thread A will take the write-lock on the shuffling cache, find that there's no cached committee and then create a "promise" (a `crossbeam_channel::Sender`) for a committee before dropping the write-lock. 1. Thread B will then be allowed to take the write-lock for the shuffling cache and find the promise created by thread A. It will block the current thread waiting for thread A to fulfill that promise. 1. Thread A will load the state from disk, obtain the shuffling, send it down the channel, insert the entry into the cache and then continue to verify the attestation. 1. Thread B will then receive the shuffling from the receiver, be un-blocked and then continue to verify the attestation. In the case where thread A fails to generate the shuffling and drops the sender, the next time that specific shuffling is requested we will detect that the channel is disconnected and return a `None` entry for that shuffling. This will cause the shuffling to be re-calculated. ## Additional Info NA --- Cargo.lock | 1 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 30 +- beacon_node/beacon_chain/src/errors.rs | 2 + beacon_node/beacon_chain/src/metrics.rs | 4 + .../beacon_chain/src/shuffling_cache.rs | 325 +++++++++++++++++- .../beacon_chain/src/state_advance_timer.rs | 2 +- .../beacon_processor/worker/gossip_methods.rs | 18 +- consensus/types/src/beacon_state.rs | 20 ++ 9 files changed, 381 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58c9ec2a72..4ca2739d14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,6 +387,7 @@ version = "0.2.0" dependencies = [ "bitvec 0.20.4", "bls", + "crossbeam-channel", "derivative", "environment", "eth1", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 092f3064d5..43cbdf1347 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -63,6 +63,7 @@ superstruct = "0.5.0" hex = "0.4.2" exit-future = "0.2.0" unused_port = {path = "../../common/unused_port"} +crossbeam-channel = "0.5.6" [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 077b425c07..609969a9a6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2646,7 +2646,7 @@ impl BeaconChain { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert(shuffling_id, committee_cache); + .insert_committee_cache(shuffling_id, committee_cache); } } @@ -4490,9 +4490,18 @@ impl BeaconChain { metrics::stop_timer(cache_wait_timer); - if let Some(committee_cache) = shuffling_cache.get(&shuffling_id) { - map_fn(committee_cache, shuffling_id.shuffling_decision_block) + if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { + let committee_cache = cache_item.wait()?; + map_fn(&committee_cache, shuffling_id.shuffling_decision_block) } else { + // Create an entry in the cache that "promises" this value will eventually be computed. + // This avoids the case where multiple threads attempt to produce the same value at the + // same time. + // + // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same + // promise from being created twice. + let sender = shuffling_cache.create_promise(shuffling_id.clone())?; + // Drop the shuffling cache to avoid holding the lock for any longer than // required. drop(shuffling_cache); @@ -4585,17 +4594,26 @@ impl BeaconChain { state.build_committee_cache(relative_epoch, &self.spec)?; - let committee_cache = state.committee_cache(relative_epoch)?; + let committee_cache = state.take_committee_cache(relative_epoch)?; + let committee_cache = Arc::new(committee_cache); let shuffling_decision_block = shuffling_id.shuffling_decision_block; self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert(shuffling_id, committee_cache); + .insert_committee_cache(shuffling_id, &committee_cache); metrics::stop_timer(committee_building_timer); - map_fn(committee_cache, shuffling_decision_block) + if let Err(e) = sender.send(committee_cache.clone()) { + debug!( + self.log, + "Did not fulfil committee promise"; + "error" => %e + ) + } + + map_fn(&committee_cache, shuffling_decision_block) } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 604fb6bea3..8b547acf0f 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -202,6 +202,8 @@ pub enum BeaconChainError { }, AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, + CommitteeCacheWait(crossbeam_channel::RecvError), + MaxCommitteePromises(usize), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index b454a6ff88..ead4a54025 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -254,6 +254,10 @@ lazy_static! { try_create_int_counter("beacon_shuffling_cache_hits_total", "Count of times shuffling cache fulfils request"); pub static ref SHUFFLING_CACHE_MISSES: Result = try_create_int_counter("beacon_shuffling_cache_misses_total", "Count of times shuffling cache fulfils request"); + pub static ref SHUFFLING_CACHE_PROMISE_HITS: Result = + try_create_int_counter("beacon_shuffling_cache_promise_hits_total", "Count of times shuffling cache returns a promise to future shuffling"); + pub static ref SHUFFLING_CACHE_PROMISE_FAILS: Result = + try_create_int_counter("beacon_shuffling_cache_promise_fails_total", "Count of times shuffling cache detects a failed promise"); /* * Early attester cache diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 0bbd4419b9..3fc5bebdf6 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,5 +1,7 @@ -use crate::metrics; +use crate::{metrics, BeaconChainError}; +use crossbeam_channel::{bounded, Receiver, Sender, TryRecvError}; use lru::LruCache; +use std::sync::Arc; use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256}; /// The size of the LRU cache that stores committee caches for quicker verification. @@ -9,12 +11,46 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256 /// ignores a few extra bytes in the caches that should be insignificant compared to the indices). const CACHE_SIZE: usize = 16; +/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this +/// limits the number of concurrent states that can be loaded into memory for the committee cache. +/// This prevents excessive memory usage at the cost of rejecting some attestations. +/// +/// We set this value to 2 since states can be quite large and have a significant impact on memory +/// usage. A healthy network cannot have more than a few committee caches and those caches should +/// always be inserted during block import. Unstable networks with a high degree of forking might +/// see some attestations dropped due to this concurrency limit, however I propose that this is +/// better than low-resource nodes going OOM. +const MAX_CONCURRENT_PROMISES: usize = 2; + +#[derive(Clone)] +pub enum CacheItem { + /// A committee. + Committee(Arc), + /// A promise for a future committee. + Promise(Receiver>), +} + +impl CacheItem { + pub fn is_promise(&self) -> bool { + matches!(self, CacheItem::Promise(_)) + } + + pub fn wait(self) -> Result, BeaconChainError> { + match self { + CacheItem::Committee(cache) => Ok(cache), + CacheItem::Promise(receiver) => receiver + .recv() + .map_err(BeaconChainError::CommitteeCacheWait), + } + } +} + /// Provides an LRU cache for `CommitteeCache`. /// /// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like /// a find/replace error. pub struct ShufflingCache { - cache: LruCache, + cache: LruCache, } impl ShufflingCache { @@ -24,27 +60,114 @@ impl ShufflingCache { } } - pub fn get(&mut self, key: &AttestationShufflingId) -> Option<&CommitteeCache> { - let opt = self.cache.get(key); - - if opt.is_some() { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - } else { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + pub fn get(&mut self, key: &AttestationShufflingId) -> Option { + match self.cache.get(key) { + // The cache contained the committee cache, return it. + item @ Some(CacheItem::Committee(_)) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + item.cloned() + } + // The cache contains a promise for the committee cache. Check to see if the promise has + // already been resolved, without waiting for it. + item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { + // The promise has already been resolved. Replace the entry in the cache with a + // `Committee` entry and then return the committee. + Ok(committee) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + let ready = CacheItem::Committee(committee); + self.cache.put(key.clone(), ready.clone()); + Some(ready) + } + // The promise has not yet been resolved. Return the promise so the caller can await + // it. + Err(TryRecvError::Empty) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + item.cloned() + } + // The sender has been dropped without sending a committee. There was most likely an + // error computing the committee cache. Drop the key from the cache and return + // `None` so the caller can recompute the committee. + // + // It's worth noting that this is the only place where we removed unresolved + // promises from the cache. This means unresolved promises will only be removed if + // we try to access them again. This is OK, since the promises don't consume much + // memory and the nature of the LRU cache means that future, relevant entries will + // still be added to the cache. We expect that *all* promises should be resolved, + // unless there is a programming or database error. + Err(TryRecvError::Disconnected) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + self.cache.pop(key); + None + } + }, + // The cache does not have this committee and it's not already promised to be computed. + None => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + None + } } - - opt } pub fn contains(&self, key: &AttestationShufflingId) -> bool { self.cache.contains(key) } - pub fn insert(&mut self, key: AttestationShufflingId, committee_cache: &CommitteeCache) { - if !self.cache.contains(&key) { - self.cache.put(key, committee_cache.clone()); + pub fn insert_committee_cache( + &mut self, + key: AttestationShufflingId, + committee_cache: &T, + ) { + if self + .cache + .get(&key) + // Replace the committee if it's not present or if it's a promise. A bird in the hand is + // worth two in the promise-bush! + .map_or(true, CacheItem::is_promise) + { + self.cache.put( + key, + CacheItem::Committee(committee_cache.to_arc_committee_cache()), + ); } } + + pub fn create_promise( + &mut self, + key: AttestationShufflingId, + ) -> Result>, BeaconChainError> { + let num_active_promises = self + .cache + .iter() + .filter(|(_, item)| item.is_promise()) + .count(); + if num_active_promises >= MAX_CONCURRENT_PROMISES { + return Err(BeaconChainError::MaxCommitteePromises(num_active_promises)); + } + + let (sender, receiver) = bounded(1); + self.cache.put(key, CacheItem::Promise(receiver)); + Ok(sender) + } +} + +/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. +pub trait ToArcCommitteeCache { + fn to_arc_committee_cache(&self) -> Arc; +} + +impl ToArcCommitteeCache for CommitteeCache { + fn to_arc_committee_cache(&self) -> Arc { + Arc::new(self.clone()) + } +} + +impl ToArcCommitteeCache for Arc { + fn to_arc_committee_cache(&self) -> Arc { + self.clone() + } } impl Default for ShufflingCache { @@ -79,3 +202,177 @@ impl BlockShufflingIds { } } } + +// Disable tests in debug since the beacon chain harness is slow unless in release. +#[cfg(not(debug_assertions))] +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::EphemeralHarnessType; + use types::*; + + type BeaconChainHarness = + crate::test_utils::BeaconChainHarness>; + + /// Returns two different committee caches for testing. + fn committee_caches() -> (Arc, Arc) { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .default_spec() + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .build(); + let (mut state, _) = harness.get_current_state_and_root(); + state + .build_committee_cache(RelativeEpoch::Current, &harness.chain.spec) + .unwrap(); + state + .build_committee_cache(RelativeEpoch::Next, &harness.chain.spec) + .unwrap(); + let committee_a = state + .committee_cache(RelativeEpoch::Current) + .unwrap() + .clone(); + let committee_b = state.committee_cache(RelativeEpoch::Next).unwrap().clone(); + assert!(committee_a != committee_b); + (Arc::new(committee_a), Arc::new(committee_b)) + } + + /// Builds a deterministic but incoherent shuffling ID from a `u64`. + fn shuffling_id(id: u64) -> AttestationShufflingId { + AttestationShufflingId { + shuffling_epoch: id.into(), + shuffling_decision_block: Hash256::from_low_u64_be(id), + } + } + + #[test] + fn resolved_promise() { + let (committee_a, _) = committee_caches(); + let id_a = shuffling_id(1); + let mut cache = ShufflingCache::new(); + + // Create a promise. + let sender = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve the newly created promise. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "the item should be a promise" + ); + + // Resolve the promise. + sender.send(committee_a.clone()).unwrap(); + + // Ensure the promise has been resolved. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_a), + "the promise should be resolved" + ); + assert_eq!(cache.cache.len(), 1, "the cache should have one entry"); + } + + #[test] + fn unresolved_promise() { + let id_a = shuffling_id(1); + let mut cache = ShufflingCache::new(); + + // Create a promise. + let sender = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve the newly created promise. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "the item should be a promise" + ); + + // Drop the sender without resolving the promise, simulating an error computing the + // committee. + drop(sender); + + // Ensure the key now indicates an empty slot. + assert!(cache.get(&id_a).is_none(), "the slot should be empty"); + assert!(cache.cache.is_empty(), "the cache should be empty"); + } + + #[test] + fn two_promises() { + let (committee_a, committee_b) = committee_caches(); + let (id_a, id_b) = (shuffling_id(1), shuffling_id(2)); + let mut cache = ShufflingCache::new(); + + // Create promise A. + let sender_a = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve promise A. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "item a should be a promise" + ); + + // Create promise B. + let sender_b = cache.create_promise(id_b.clone()).unwrap(); + + // Retrieve promise B. + let item = cache.get(&id_b).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "item b should be a promise" + ); + + // Resolve promise A. + sender_a.send(committee_a.clone()).unwrap(); + // Ensure promise A has been resolved. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_a), + "promise A should be resolved" + ); + + // Resolve promise B. + sender_b.send(committee_b.clone()).unwrap(); + // Ensure promise B has been resolved. + let item = cache.get(&id_b).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_b), + "promise B should be resolved" + ); + + // Check both entries again. + assert!( + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a), + "promise A should remain resolved" + ); + assert!( + matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b), + "promise B should remain resolved" + ); + assert_eq!(cache.cache.len(), 2, "the cache should have two entries"); + } + + #[test] + fn too_many_promises() { + let mut cache = ShufflingCache::new(); + + for i in 0..MAX_CONCURRENT_PROMISES { + cache.create_promise(shuffling_id(i as u64)).unwrap(); + } + + // Ensure that the next promise returns an error. It is important for the application to + // dump his ass when he can't keep his promises, you're a queen and you deserve better. + assert!(matches!( + cache.create_promise(shuffling_id(MAX_CONCURRENT_PROMISES as u64)), + Err(BeaconChainError::MaxCommitteePromises( + MAX_CONCURRENT_PROMISES + )) + )); + assert_eq!( + cache.cache.len(), + MAX_CONCURRENT_PROMISES, + "the cache should have two entries" + ); + } +} diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 4359b6f1e8..72fc973e54 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -394,7 +394,7 @@ fn advance_head( .shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(BeaconChainError::AttestationCacheLockTimeout)? - .insert(shuffling_id.clone(), committee_cache); + .insert_committee_cache(shuffling_id.clone(), committee_cache); debug!( log, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 93ed1b463b..307b569a91 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1752,6 +1752,19 @@ impl Worker { debug!(self.log, "Attestation for finalized state"; "peer_id" => % peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } + e @ AttnError::BeaconChainError(BeaconChainError::MaxCommitteePromises(_)) => { + debug!( + self.log, + "Dropping attestation"; + "target_root" => ?failed_att.attestation().data.target.root, + "beacon_block_root" => ?beacon_block_root, + "slot" => ?failed_att.attestation().data.slot, + "type" => ?attestation_type, + "error" => ?e, + "peer_id" => % peer_id + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } AttnError::BeaconChainError(e) => { /* * Lighthouse hit an unexpected error whilst processing the attestation. It @@ -1762,7 +1775,10 @@ impl Worker { */ error!( self.log, - "Unable to validate aggregate"; + "Unable to validate attestation"; + "beacon_block_root" => ?beacon_block_root, + "slot" => ?failed_att.attestation().data.slot, + "type" => ?attestation_type, "peer_id" => %peer_id, "error" => ?e, ); diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index fca200312f..a5d00cdf2d 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1498,6 +1498,26 @@ impl BeaconState { } } + /// Returns the cache for some `RelativeEpoch`, replacing the existing cache with an + /// un-initialized cache. Returns an error if the existing cache has not been initialized. + pub fn take_committee_cache( + &mut self, + relative_epoch: RelativeEpoch, + ) -> Result { + let i = Self::committee_cache_index(relative_epoch); + let current_epoch = self.current_epoch(); + let cache = self + .committee_caches_mut() + .get_mut(i) + .ok_or(Error::CommitteeCachesOutOfBounds(i))?; + + if cache.is_initialized_at(relative_epoch.into_epoch(current_epoch)) { + Ok(mem::take(cache)) + } else { + Err(Error::CommitteeCacheUninitialized(Some(relative_epoch))) + } + } + /// Drops the cache, leaving it in an uninitialized state. pub fn drop_committee_cache(&mut self, relative_epoch: RelativeEpoch) -> Result<(), Error> { *self.committee_cache_at_index_mut(Self::committee_cache_index(relative_epoch))? = From bde3c168e21b4a62cc3ceed7ad3c89fd16564cb6 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 16 Sep 2022 08:54:04 +0000 Subject: [PATCH 02/10] Add `lcli block-root` tool (#3580) ## Issue Addressed NA ## Proposed Changes Adds a simple tool for computing the block root of some block from a beacon-API or a file. This is useful for benchmarking. ## Additional Info NA --- lcli/src/block_root.rs | 103 ++++++++++++++++++++++++++++++++++ lcli/src/main.rs | 39 +++++++++++++ lcli/src/transition_blocks.rs | 7 ++- 3 files changed, 147 insertions(+), 2 deletions(-) create mode 100644 lcli/src/block_root.rs diff --git a/lcli/src/block_root.rs b/lcli/src/block_root.rs new file mode 100644 index 0000000000..7631872c5c --- /dev/null +++ b/lcli/src/block_root.rs @@ -0,0 +1,103 @@ +//! # Block Root +//! +//! Use this tool to compute the canonical root of a `SignedBeaconBlock`. This is most likely only +//! useful for benchmarking with tools like `flamegraph`. +//! +//! It can load a block from a SSZ file or download it from a beaconAPI. +//! +//! Logging output is controlled via the `RUST_LOG` environment variable. For example, `export +//! RUST_LOG=debug`. +//! +//! ## Examples +//! +//! Download a block and re-compute the canonical root 5,000 times. +//! +//! ```ignore +//! lcli block-root \ +//! --beacon-url http://localhost:5052 \ +//! --block-id 0x3d887d30ee25c9c1ce7621ec30a7b49b07d6a03200df9c7206faca52a533f432 \ +//! --runs 5000 +//! ``` +//! +//! Load a block from SSZ and compute the canonical root once. +//! +//! ```ignore +//! lcli block-root \ +//! --block-path /tmp/block.ssz \ +//! --runs 1 +//! ``` +use crate::transition_blocks::load_from_ssz_with; +use clap::ArgMatches; +use clap_utils::{parse_optional, parse_required}; +use environment::Environment; +use eth2::{types::BlockId, BeaconNodeHttpClient, SensitiveUrl, Timeouts}; +use std::path::PathBuf; +use std::time::{Duration, Instant}; +use types::{EthSpec, FullPayload, SignedBeaconBlock}; + +const HTTP_TIMEOUT: Duration = Duration::from_secs(5); + +pub fn run(mut env: Environment, matches: &ArgMatches) -> Result<(), String> { + let spec = &T::default_spec(); + let executor = env.core_context().executor; + + /* + * Parse (most) CLI arguments. + */ + + let block_path: Option = parse_optional(matches, "block-path")?; + let beacon_url: Option = parse_optional(matches, "beacon-url")?; + let runs: usize = parse_required(matches, "runs")?; + + info!("Using {} spec", T::spec_name()); + info!("Doing {} runs", runs); + + /* + * Load the block and pre-state from disk or beaconAPI URL. + */ + + let block: SignedBeaconBlock> = match (block_path, beacon_url) { + (Some(block_path), None) => { + info!("Block path: {:?}", block_path); + load_from_ssz_with(&block_path, spec, SignedBeaconBlock::from_ssz_bytes)? + } + (None, Some(beacon_url)) => { + let block_id: BlockId = parse_required(matches, "block-id")?; + let client = BeaconNodeHttpClient::new(beacon_url, Timeouts::set_all(HTTP_TIMEOUT)); + executor + .handle() + .ok_or("shutdown in progress")? + .block_on(async move { + let block = client + .get_beacon_blocks(block_id) + .await + .map_err(|e| format!("Failed to download block: {:?}", e))? + .ok_or_else(|| format!("Unable to locate block at {:?}", block_id))? + .data; + Ok::<_, String>(block) + }) + .map_err(|e| format!("Failed to complete task: {:?}", e))? + } + _ => return Err("must supply --block-path *or* --beacon-url".into()), + }; + + /* + * Perform the core "runs". + */ + + let mut block_root = None; + for i in 0..runs { + let start = Instant::now(); + + block_root = Some(block.canonical_root()); + + let duration = Instant::now().duration_since(start); + info!("Run {}: {:?}", i, duration); + } + + if let Some(block_root) = block_root { + info!("Block root is {:?}", block_root); + } + + Ok(()) +} diff --git a/lcli/src/main.rs b/lcli/src/main.rs index e6a4eeeacb..84b951e3f1 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -1,5 +1,6 @@ #[macro_use] extern crate log; +mod block_root; mod change_genesis_time; mod check_deposit_data; mod create_payload_header; @@ -714,6 +715,42 @@ fn main() { .help("List of Attestations to convert to indexed form (JSON)"), ) ) + .subcommand( + SubCommand::with_name("block-root") + .about("Computes the block root of some block") + .arg( + Arg::with_name("block-path") + .long("block-path") + .value_name("PATH") + .takes_value(true) + .conflicts_with("beacon-url") + .requires("pre-state-path") + .help("Path to load a SignedBeaconBlock from file as SSZ."), + ) + .arg( + Arg::with_name("beacon-url") + .long("beacon-url") + .value_name("URL") + .takes_value(true) + .help("URL to a beacon-API provider."), + ) + .arg( + Arg::with_name("block-id") + .long("block-id") + .value_name("BLOCK_ID") + .takes_value(true) + .requires("beacon-url") + .help("Identifier for a block as per beacon-API standards (slot, root, etc.)"), + ) + .arg( + Arg::with_name("runs") + .long("runs") + .value_name("INTEGER") + .takes_value(true) + .default_value("1") + .help("Number of repeat runs, useful for benchmarking."), + ) + ) .get_matches(); let result = matches @@ -799,6 +836,8 @@ fn run( .map_err(|e| format!("Failed to run insecure-validators command: {}", e)), ("indexed-attestations", Some(matches)) => indexed_attestations::run::(matches) .map_err(|e| format!("Failed to run indexed-attestations command: {}", e)), + ("block-root", Some(matches)) => block_root::run::(env, matches) + .map_err(|e| format!("Failed to run block-root command: {}", e)), (other, _) => Err(format!("Unknown subcommand {}. See --help.", other)), } } diff --git a/lcli/src/transition_blocks.rs b/lcli/src/transition_blocks.rs index 793bdb6422..dc825d2c02 100644 --- a/lcli/src/transition_blocks.rs +++ b/lcli/src/transition_blocks.rs @@ -6,6 +6,9 @@ //! It can load states and blocks from file or pull them from a beaconAPI. Objects pulled from a //! beaconAPI can be saved to disk to reduce future calls to that server. //! +//! Logging output is controlled via the `RUST_LOG` environment variable. For example, `export +//! RUST_LOG=debug`. +//! //! ## Examples //! //! ### Run using a block from a beaconAPI @@ -124,8 +127,8 @@ pub fn run(mut env: Environment, matches: &ArgMatches) -> Result< let (mut pre_state, mut state_root_opt, block) = match (pre_state_path, block_path, beacon_url) { (Some(pre_state_path), Some(block_path), None) => { - info!("Block path: {:?}", pre_state_path); - info!("Pre-state path: {:?}", block_path); + info!("Block path: {:?}", block_path); + info!("Pre-state path: {:?}", pre_state_path); let pre_state = load_from_ssz_with(&pre_state_path, spec, BeaconState::from_ssz_bytes)?; let block = load_from_ssz_with(&block_path, spec, SignedBeaconBlock::from_ssz_bytes)?; (pre_state, None, block) From b0b606dabe5c1775c6cb1436bf459cd5c888e97c Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 16 Sep 2022 08:54:06 +0000 Subject: [PATCH 03/10] Use `SmallVec` for `TreeHash` packed encoding (#3581) ## Issue Addressed NA ## Proposed Changes I've noticed that our block hashing times increase significantly after the merge. I did some flamegraph-ing and noticed that we're allocating a `Vec` for each byte of each execution payload transaction. This seems like unnecessary work and a bit of a fragmentation risk. This PR switches to `SmallVec<[u8; 32]>` for the packed encoding of `TreeHash`. I believe this is a nice simple optimisation with no downside. ### Benchmarking These numbers were computed using #3580 on my desktop (i7 hex-core). You can see a bit of noise in the numbers, that's probably just my computer doing other things. Generally I found this change takes the time from 10-11ms to 8-9ms. I can also see all the allocations disappear from flamegraph. This is the block being benchmarked: https://beaconcha.in/slot/4704236 #### Before ``` [2022-09-15T21:44:19Z INFO lcli::block_root] Run 980: 10.553003ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 981: 10.563737ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 982: 10.646352ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 983: 10.628532ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 984: 10.552112ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 985: 10.587778ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 986: 10.640526ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 987: 10.587243ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 988: 10.554748ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 989: 10.551111ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 990: 11.559031ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 991: 11.944827ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 992: 10.554308ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 993: 11.043397ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 994: 11.043315ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 995: 11.207711ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 996: 11.056246ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 997: 11.049706ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 998: 11.432449ms [2022-09-15T21:44:19Z INFO lcli::block_root] Run 999: 11.149617ms ``` #### After ``` [2022-09-15T21:41:49Z INFO lcli::block_root] Run 980: 14.011653ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 981: 8.925314ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 982: 8.849563ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 983: 8.893689ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 984: 8.902964ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 985: 8.942067ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 986: 8.907088ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 987: 9.346101ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 988: 8.96142ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 989: 9.366437ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 990: 9.809334ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 991: 9.541561ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 992: 11.143518ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 993: 10.821181ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 994: 9.855973ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 995: 10.941006ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 996: 9.596155ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 997: 9.121739ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 998: 9.090019ms [2022-09-15T21:41:49Z INFO lcli::block_root] Run 999: 9.071885ms ``` ## Additional Info Please provide any additional information. For example, future considerations or information useful for reviewers. --- consensus/ssz_types/src/bitfield.rs | 4 +-- consensus/ssz_types/src/fixed_vector.rs | 2 +- consensus/ssz_types/src/variable_list.rs | 2 +- consensus/tree_hash/src/impls.rs | 30 ++++++++++----------- consensus/tree_hash/src/lib.rs | 11 +++++--- consensus/tree_hash/tests/tests.rs | 4 +-- consensus/tree_hash_derive/src/lib.rs | 6 ++--- consensus/types/src/execution_block_hash.rs | 2 +- consensus/types/src/graffiti.rs | 4 +-- consensus/types/src/participation_flags.rs | 4 +-- consensus/types/src/payload.rs | 6 ++--- consensus/types/src/slot_epoch.rs | 2 +- consensus/types/src/slot_epoch_macros.rs | 4 +-- crypto/bls/src/macros.rs | 2 +- testing/ef_tests/src/cases/common.rs | 2 +- 15 files changed, 44 insertions(+), 41 deletions(-) diff --git a/consensus/ssz_types/src/bitfield.rs b/consensus/ssz_types/src/bitfield.rs index 599170fa29..b0cf4551ee 100644 --- a/consensus/ssz_types/src/bitfield.rs +++ b/consensus/ssz_types/src/bitfield.rs @@ -620,7 +620,7 @@ impl tree_hash::TreeHash for Bitfield> { tree_hash::TreeHashType::List } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { unreachable!("List should never be packed.") } @@ -641,7 +641,7 @@ impl tree_hash::TreeHash for Bitfield> { tree_hash::TreeHashType::Vector } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { unreachable!("Vector should never be packed.") } diff --git a/consensus/ssz_types/src/fixed_vector.rs b/consensus/ssz_types/src/fixed_vector.rs index 5f7a4af962..e64e76ef4d 100644 --- a/consensus/ssz_types/src/fixed_vector.rs +++ b/consensus/ssz_types/src/fixed_vector.rs @@ -167,7 +167,7 @@ where tree_hash::TreeHashType::Vector } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { unreachable!("Vector should never be packed.") } diff --git a/consensus/ssz_types/src/variable_list.rs b/consensus/ssz_types/src/variable_list.rs index 49f8004b22..f23872c87f 100644 --- a/consensus/ssz_types/src/variable_list.rs +++ b/consensus/ssz_types/src/variable_list.rs @@ -184,7 +184,7 @@ where tree_hash::TreeHashType::List } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { unreachable!("List should never be packed.") } diff --git a/consensus/tree_hash/src/impls.rs b/consensus/tree_hash/src/impls.rs index 00fed489c7..cf05d2a3d5 100644 --- a/consensus/tree_hash/src/impls.rs +++ b/consensus/tree_hash/src/impls.rs @@ -14,8 +14,8 @@ macro_rules! impl_for_bitsize { TreeHashType::Basic } - fn tree_hash_packed_encoding(&self) -> Vec { - self.to_le_bytes().to_vec() + fn tree_hash_packed_encoding(&self) -> PackedEncoding { + PackedEncoding::from_slice(&self.to_le_bytes()) } fn tree_hash_packing_factor() -> usize { @@ -41,7 +41,7 @@ impl TreeHash for bool { TreeHashType::Basic } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { (*self as u8).tree_hash_packed_encoding() } @@ -62,7 +62,7 @@ macro_rules! impl_for_lt_32byte_u8_array { TreeHashType::Vector } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { unreachable!("bytesN should never be packed.") } @@ -87,10 +87,10 @@ impl TreeHash for U128 { TreeHashType::Basic } - fn tree_hash_packed_encoding(&self) -> Vec { - let mut result = vec![0; 16]; + fn tree_hash_packed_encoding(&self) -> PackedEncoding { + let mut result = [0; 16]; self.to_little_endian(&mut result); - result + PackedEncoding::from_slice(&result) } fn tree_hash_packing_factor() -> usize { @@ -109,10 +109,10 @@ impl TreeHash for U256 { TreeHashType::Basic } - fn tree_hash_packed_encoding(&self) -> Vec { - let mut result = vec![0; 32]; + fn tree_hash_packed_encoding(&self) -> PackedEncoding { + let mut result = [0; 32]; self.to_little_endian(&mut result); - result + PackedEncoding::from_slice(&result) } fn tree_hash_packing_factor() -> usize { @@ -131,10 +131,10 @@ impl TreeHash for H160 { TreeHashType::Vector } - fn tree_hash_packed_encoding(&self) -> Vec { - let mut result = vec![0; 32]; + fn tree_hash_packed_encoding(&self) -> PackedEncoding { + let mut result = [0; 32]; result[0..20].copy_from_slice(self.as_bytes()); - result + PackedEncoding::from_slice(&result) } fn tree_hash_packing_factor() -> usize { @@ -153,8 +153,8 @@ impl TreeHash for H256 { TreeHashType::Vector } - fn tree_hash_packed_encoding(&self) -> Vec { - self.as_bytes().to_vec() + fn tree_hash_packed_encoding(&self) -> PackedEncoding { + PackedEncoding::from_slice(self.as_bytes()) } fn tree_hash_packing_factor() -> usize { diff --git a/consensus/tree_hash/src/lib.rs b/consensus/tree_hash/src/lib.rs index 997ae867f7..ec40de9160 100644 --- a/consensus/tree_hash/src/lib.rs +++ b/consensus/tree_hash/src/lib.rs @@ -8,13 +8,16 @@ pub use merkleize_padded::merkleize_padded; pub use merkleize_standard::merkleize_standard; use eth2_hashing::{hash_fixed, ZERO_HASHES, ZERO_HASHES_MAX_INDEX}; +use smallvec::SmallVec; pub const BYTES_PER_CHUNK: usize = 32; pub const HASHSIZE: usize = 32; pub const MERKLE_HASH_CHUNK: usize = 2 * BYTES_PER_CHUNK; pub const MAX_UNION_SELECTOR: u8 = 127; +pub const SMALLVEC_SIZE: usize = 32; pub type Hash256 = ethereum_types::H256; +pub type PackedEncoding = SmallVec<[u8; SMALLVEC_SIZE]>; /// Convenience method for `MerkleHasher` which also provides some fast-paths for small trees. /// @@ -109,7 +112,7 @@ pub enum TreeHashType { pub trait TreeHash { fn tree_hash_type() -> TreeHashType; - fn tree_hash_packed_encoding(&self) -> Vec; + fn tree_hash_packed_encoding(&self) -> PackedEncoding; fn tree_hash_packing_factor() -> usize; @@ -125,7 +128,7 @@ where T::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { T::tree_hash_packed_encoding(*self) } @@ -146,7 +149,7 @@ macro_rules! tree_hash_ssz_encoding_as_vector { tree_hash::TreeHashType::Vector } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { unreachable!("Vector should never be packed.") } @@ -169,7 +172,7 @@ macro_rules! tree_hash_ssz_encoding_as_list { tree_hash::TreeHashType::List } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { unreachable!("List should never be packed.") } diff --git a/consensus/tree_hash/tests/tests.rs b/consensus/tree_hash/tests/tests.rs index b7f7178d06..8b2a4b21be 100644 --- a/consensus/tree_hash/tests/tests.rs +++ b/consensus/tree_hash/tests/tests.rs @@ -1,5 +1,5 @@ use ssz_derive::Encode; -use tree_hash::{Hash256, MerkleHasher, TreeHash, BYTES_PER_CHUNK}; +use tree_hash::{Hash256, MerkleHasher, PackedEncoding, TreeHash, BYTES_PER_CHUNK}; use tree_hash_derive::TreeHash; #[derive(Encode)] @@ -18,7 +18,7 @@ impl tree_hash::TreeHash for HashVec { tree_hash::TreeHashType::List } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { unreachable!("List should never be packed.") } diff --git a/consensus/tree_hash_derive/src/lib.rs b/consensus/tree_hash_derive/src/lib.rs index f65be1b6b1..21ff324d54 100644 --- a/consensus/tree_hash_derive/src/lib.rs +++ b/consensus/tree_hash_derive/src/lib.rs @@ -150,7 +150,7 @@ fn tree_hash_derive_struct(item: &DeriveInput, struct_data: &DataStruct) -> Toke tree_hash::TreeHashType::Container } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { unreachable!("Struct should never be packed.") } @@ -231,7 +231,7 @@ fn tree_hash_derive_enum_transparent( tree_hash::TreeHashType::Container } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { unreachable!("Enum should never be packed") } @@ -288,7 +288,7 @@ fn tree_hash_derive_enum_union(derive_input: &DeriveInput, enum_data: &DataEnum) tree_hash::TreeHashType::Container } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { unreachable!("Enum should never be packed") } diff --git a/consensus/types/src/execution_block_hash.rs b/consensus/types/src/execution_block_hash.rs index 978bd4c69a..988dcece5e 100644 --- a/consensus/types/src/execution_block_hash.rs +++ b/consensus/types/src/execution_block_hash.rs @@ -67,7 +67,7 @@ impl tree_hash::TreeHash for ExecutionBlockHash { Hash256::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { self.0.tree_hash_packed_encoding() } diff --git a/consensus/types/src/graffiti.rs b/consensus/types/src/graffiti.rs index f5f74b601b..73beb82649 100644 --- a/consensus/types/src/graffiti.rs +++ b/consensus/types/src/graffiti.rs @@ -7,7 +7,7 @@ use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use ssz::{Decode, DecodeError, Encode}; use std::fmt; use std::str::FromStr; -use tree_hash::TreeHash; +use tree_hash::{PackedEncoding, TreeHash}; pub const GRAFFITI_BYTES_LEN: usize = 32; @@ -159,7 +159,7 @@ impl TreeHash for Graffiti { <[u8; GRAFFITI_BYTES_LEN]>::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { self.0.tree_hash_packed_encoding() } diff --git a/consensus/types/src/participation_flags.rs b/consensus/types/src/participation_flags.rs index 74c2cf73ba..a2dd494864 100644 --- a/consensus/types/src/participation_flags.rs +++ b/consensus/types/src/participation_flags.rs @@ -3,7 +3,7 @@ use safe_arith::{ArithError, SafeArith}; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, DecodeError, Encode}; use test_random_derive::TestRandom; -use tree_hash::{TreeHash, TreeHashType}; +use tree_hash::{PackedEncoding, TreeHash, TreeHashType}; #[derive(Debug, Default, Clone, Copy, PartialEq, Deserialize, Serialize, TestRandom)] #[serde(transparent)] @@ -78,7 +78,7 @@ impl TreeHash for ParticipationFlags { u8::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { self.bits.tree_hash_packed_encoding() } diff --git a/consensus/types/src/payload.rs b/consensus/types/src/payload.rs index 114ca02ecf..667fff58c7 100644 --- a/consensus/types/src/payload.rs +++ b/consensus/types/src/payload.rs @@ -7,7 +7,7 @@ use std::convert::TryFrom; use std::fmt::Debug; use std::hash::Hash; use test_random_derive::TestRandom; -use tree_hash::TreeHash; +use tree_hash::{PackedEncoding, TreeHash}; #[derive(Debug)] pub enum BlockType { @@ -175,7 +175,7 @@ impl TreeHash for BlindedPayload { >::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { self.execution_payload_header.tree_hash_packed_encoding() } @@ -244,7 +244,7 @@ impl TreeHash for FullPayload { >::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { self.execution_payload.tree_hash_packed_encoding() } diff --git a/consensus/types/src/slot_epoch.rs b/consensus/types/src/slot_epoch.rs index e68055d34d..277aa9deae 100644 --- a/consensus/types/src/slot_epoch.rs +++ b/consensus/types/src/slot_epoch.rs @@ -16,7 +16,7 @@ use crate::{ChainSpec, SignedRoot}; use rand::RngCore; use safe_arith::{ArithError, SafeArith}; use serde_derive::{Deserialize, Serialize}; -use ssz::{ssz_encode, Decode, DecodeError, Encode}; +use ssz::{Decode, DecodeError, Encode}; use std::fmt; use std::hash::Hash; use std::iter::Iterator; diff --git a/consensus/types/src/slot_epoch_macros.rs b/consensus/types/src/slot_epoch_macros.rs index b8e6202fe3..fafc455ef8 100644 --- a/consensus/types/src/slot_epoch_macros.rs +++ b/consensus/types/src/slot_epoch_macros.rs @@ -290,8 +290,8 @@ macro_rules! impl_ssz { tree_hash::TreeHashType::Basic } - fn tree_hash_packed_encoding(&self) -> Vec { - ssz_encode(self) + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { + self.0.tree_hash_packed_encoding() } fn tree_hash_packing_factor() -> usize { diff --git a/crypto/bls/src/macros.rs b/crypto/bls/src/macros.rs index a5fce70a96..f3a7374ba7 100644 --- a/crypto/bls/src/macros.rs +++ b/crypto/bls/src/macros.rs @@ -7,7 +7,7 @@ macro_rules! impl_tree_hash { tree_hash::TreeHashType::Vector } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { unreachable!("Vector should never be packed.") } diff --git a/testing/ef_tests/src/cases/common.rs b/testing/ef_tests/src/cases/common.rs index ade8711cdc..e77e561939 100644 --- a/testing/ef_tests/src/cases/common.rs +++ b/testing/ef_tests/src/cases/common.rs @@ -43,7 +43,7 @@ macro_rules! uint_wrapper { <$wrapped_type>::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { self.x.tree_hash_packed_encoding() } From 5b2843c2cdba11dc61d5646fa11ae30c8b3ab695 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 16 Sep 2022 11:54:17 +0000 Subject: [PATCH 04/10] Pre-allocate vectors in SSZ decoding (#3417) ## Issue Addressed Fixes a potential regression in memory fragmentation identified by @paulhauner here: https://github.com/sigp/lighthouse/pull/3371#discussion_r931770045. ## Proposed Changes Immediately allocate a vector with sufficient size to hold all decoded elements in SSZ decoding. The `size_hint` is derived from the range iterator here: https://github.com/sigp/lighthouse/blob/2983235650811437b44199f9c94e517e948a1e9b/consensus/ssz/src/decode/impls.rs#L489 ## Additional Info I'd like to test this out on some infra for a substantial duration to see if it affects total fragmentation. --- consensus/ssz/src/decode/try_from_iter.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/consensus/ssz/src/decode/try_from_iter.rs b/consensus/ssz/src/decode/try_from_iter.rs index 22db02d4fc..1ff89a107f 100644 --- a/consensus/ssz/src/decode/try_from_iter.rs +++ b/consensus/ssz/src/decode/try_from_iter.rs @@ -29,11 +29,18 @@ pub trait TryFromIter: Sized { impl TryFromIter for Vec { type Error = Infallible; - fn try_from_iter(iter: I) -> Result + fn try_from_iter(values: I) -> Result where I: IntoIterator, { - Ok(Self::from_iter(iter)) + // Pre-allocate the expected size of the Vec, which is parsed from the SSZ input bytes as + // `num_items`. This length has already been checked to be less than or equal to the type's + // maximum length in `decode_list_of_variable_length_items`. + let iter = values.into_iter(); + let (_, opt_max_len) = iter.size_hint(); + let mut vec = Vec::with_capacity(opt_max_len.unwrap_or(0)); + vec.extend(iter); + Ok(vec) } } From ca42ef2e5a5eaa29146631526f3b489ccbec7caa Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sat, 17 Sep 2022 02:27:01 +0000 Subject: [PATCH 05/10] Prune finalized execution payloads (#3565) ## Issue Addressed Closes https://github.com/sigp/lighthouse/issues/3556 ## Proposed Changes Delete finalized execution payloads from the database in two places: 1. When running the finalization migration in `migrate_database`. We delete the finalized payloads between the last split point and the new updated split point. _If_ payloads are already pruned prior to this then this is sufficient to prune _all_ payloads as non-canonical payloads are already deleted by the head pruner, and all canonical payloads prior to the previous split will already have been pruned. 2. To address the fact that users will update to this code _after_ the merge on mainnet (and testnets), we need a one-off scan to delete the finalized payloads from the canonical chain. This is implemented in `try_prune_execution_payloads` which runs on startup and scans the chain back to the Bellatrix fork or the anchor slot (if checkpoint synced after Bellatrix). In the case where payloads are already pruned this check only imposes a single state load for the split state, which shouldn't be _too slow_. Even so, a flag `--prepare-payloads-on-startup=false` is provided to turn this off after it has run the first time, which provides faster start-up times. There is also a new `lighthouse db prune_payloads` subcommand for users who prefer to run the pruning manually. ## Additional Info The tests have been updated to not rely on finalized payloads in the database, instead using the `MockExecutionLayer` to reconstruct them. Additionally a check was added to `check_chain_dump` which asserts the non-existence or existence of payloads on disk depending on their slot. --- beacon_node/beacon_chain/src/builder.rs | 7 ++ .../beacon_chain/tests/block_verification.rs | 33 +++--- beacon_node/beacon_chain/tests/store_tests.rs | 29 ++++- beacon_node/src/cli.rs | 7 ++ beacon_node/src/config.rs | 6 + beacon_node/store/src/config.rs | 3 + beacon_node/store/src/hot_cold_store.rs | 110 +++++++++++++++++- database_manager/src/lib.rs | 32 +++++ lighthouse/tests/beacon_node.rs | 13 +++ 9 files changed, 211 insertions(+), 29 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a578629b69..45375ec01e 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -266,6 +266,13 @@ where self.genesis_time = Some(genesis_state.genesis_time()); + // Prune finalized execution payloads. + if store.get_config().prune_payloads_on_init { + store + .try_prune_execution_payloads(false) + .map_err(|e| format!("Error pruning execution payloads: {e:?}"))?; + } + self.op_pool = Some( store .get_item::>(&OP_POOL_DB_KEY) diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index c2283321cb..17c84bd697 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -41,28 +41,27 @@ async fn get_chain_segment() -> Vec> { ) .await; - harness + let mut segment = Vec::with_capacity(CHAIN_SEGMENT_LENGTH); + for snapshot in harness .chain .chain_dump() .expect("should dump chain") .into_iter() - .map(|snapshot| { - let full_block = harness - .chain - .store - .make_full_block( - &snapshot.beacon_block_root, - snapshot.beacon_block.as_ref().clone(), - ) - .unwrap(); - BeaconSnapshot { - beacon_block_root: snapshot.beacon_block_root, - beacon_block: Arc::new(full_block), - beacon_state: snapshot.beacon_state, - } - }) .skip(1) - .collect() + { + let full_block = harness + .chain + .get_block(&snapshot.beacon_block_root) + .await + .unwrap() + .unwrap(); + segment.push(BeaconSnapshot { + beacon_block_root: snapshot.beacon_block_root, + beacon_block: Arc::new(full_block), + beacon_state: snapshot.beacon_state, + }); + } + segment } fn get_harness(validator_count: usize) -> BeaconChainHarness> { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index afd97750a6..b85ff50efb 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2114,14 +2114,16 @@ async fn weak_subjectivity_sync() { assert_eq!(new_blocks[0].beacon_block.slot(), wss_slot + 1); for snapshot in new_blocks { - let block = &snapshot.beacon_block; let full_block = harness .chain - .store - .make_full_block(&snapshot.beacon_block_root, block.as_ref().clone()) + .get_block(&snapshot.beacon_block_root) + .await + .unwrap() .unwrap(); + let slot = full_block.slot(); + let state_root = full_block.state_root(); - beacon_chain.slot_clock.set_slot(block.slot().as_u64()); + beacon_chain.slot_clock.set_slot(slot.as_u64()); beacon_chain .process_block(Arc::new(full_block), CountUnrealized::True) .await @@ -2129,10 +2131,9 @@ async fn weak_subjectivity_sync() { beacon_chain.recompute_head_at_current_slot().await; // Check that the new block's state can be loaded correctly. - let state_root = block.state_root(); let mut state = beacon_chain .store - .get_state(&state_root, Some(block.slot())) + .get_state(&state_root, Some(slot)) .unwrap() .unwrap(); assert_eq!(state.update_tree_hash_cache().unwrap(), state_root); @@ -2583,6 +2584,7 @@ fn check_split_slot(harness: &TestHarness, store: Arc, L /// Check that all the states in a chain dump have the correct tree hash. fn check_chain_dump(harness: &TestHarness, expected_len: u64) { let chain_dump = harness.chain.chain_dump().unwrap(); + let split_slot = harness.chain.store.get_split_slot(); assert_eq!(chain_dump.len() as u64, expected_len); @@ -2606,6 +2608,21 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { .slot(), checkpoint.beacon_state.slot() ); + + // Check presence of execution payload on disk. + if harness.chain.spec.bellatrix_fork_epoch.is_some() { + assert_eq!( + harness + .chain + .store + .execution_payload_exists(&checkpoint.beacon_block_root) + .unwrap(), + checkpoint.beacon_block.slot() >= split_slot, + "incorrect payload storage for block at slot {}: {:?}", + checkpoint.beacon_block.slot(), + checkpoint.beacon_block_root, + ); + } } // Check the forwards block roots iterator against the chain dump diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 3c421a1a3f..d9d5c715b4 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -515,6 +515,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("true") ) + .arg( + Arg::with_name("prune-payloads-on-startup") + .long("prune-payloads-on-startup") + .help("Check for execution payloads to prune on start-up.") + .takes_value(true) + .default_value("true") + ) /* * Misc. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index b57ba02687..368fce573d 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -358,6 +358,12 @@ pub fn get_config( .map_err(|_| "auto-compact-db takes a boolean".to_string())?; } + if let Some(prune_payloads_on_init) = + clap_utils::parse_optional(cli_args, "prune-payloads-on-startup")? + { + client_config.store.prune_payloads_on_init = prune_payloads_on_init; + } + /* * Zero-ports * diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 4268ec2e91..9bc9ee8a45 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -21,6 +21,8 @@ pub struct StoreConfig { pub compact_on_init: bool, /// Whether to compact the database during database pruning. pub compact_on_prune: bool, + /// Whether to try pruning execution payloads on initialization. + pub prune_payloads_on_init: bool, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -43,6 +45,7 @@ impl Default for StoreConfig { block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, compact_on_init: false, compact_on_prune: true, + prune_payloads_on_init: true, } } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c4b4a64a05..80de674e9a 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -7,7 +7,7 @@ use crate::config::{ }; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; use crate::impls::beacon_state::{get_full_state, store_full_state}; -use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; +use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; @@ -438,6 +438,12 @@ impl, Cold: ItemStore> HotColdDB .ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into()) } + /// Check if the execution payload for a block exists on disk. + pub fn execution_payload_exists(&self, block_root: &Hash256) -> Result { + self.get_item::>(block_root) + .map(|payload| payload.is_some()) + } + /// Determine whether a block exists in the database. pub fn block_exists(&self, block_root: &Hash256) -> Result { self.hot_db @@ -1418,6 +1424,93 @@ impl, Cold: ItemStore> HotColdDB &CompactionTimestamp(compaction_timestamp.as_secs()), ) } + + /// Try to prune all execution payloads, returning early if there is no need to prune. + pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> { + let split = self.get_split_info(); + + if split.slot == 0 { + return Ok(()); + } + + let bellatrix_fork_slot = if let Some(epoch) = self.spec.bellatrix_fork_epoch { + epoch.start_slot(E::slots_per_epoch()) + } else { + return Ok(()); + }; + + // Load the split state so we can backtrack to find execution payloads. + let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( + HotColdDBError::MissingSplitState(split.state_root, split.slot), + )?; + + // The finalized block may or may not have its execution payload stored, depending on + // whether it was at a skipped slot. However for a fully pruned database its parent + // should *always* have been pruned. + let split_parent_block_root = split_state.get_block_root(split.slot - 1)?; + if !self.execution_payload_exists(split_parent_block_root)? && !force { + info!(self.log, "Execution payloads are pruned"); + return Ok(()); + } + + // Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes + // first. + let split_block_root = split_state.get_latest_block_root(split.state_root); + let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot); + + let mut ops = vec![]; + + for res in std::iter::once(Ok((split_block_root, split.slot))) + .chain(BlockRootsIterator::new(self, &split_state)) + { + let (block_root, slot) = match res { + Ok(tuple) => tuple, + Err(e) => { + warn!( + self.log, + "Stopping backtrack early"; + "error" => ?e, + ); + break; + } + }; + + if slot < bellatrix_fork_slot { + info!( + self.log, + "Finished backtrack to Bellatrix fork"; + ); + break; + } + + if self.execution_payload_exists(&block_root)? { + debug!( + self.log, + "Pruning execution payload"; + "slot" => slot, + "block_root" => ?block_root, + ); + ops.push(StoreOp::DeleteExecutionPayload(block_root)); + } + + if Some(slot) == anchor_slot { + info!( + self.log, + "Finished backtrack to anchor state"; + "slot" => slot + ); + break; + } + } + let payloads_pruned = ops.len(); + self.do_atomically(ops)?; + info!( + self.log, + "Execution payload pruning complete"; + "payloads_pruned" => payloads_pruned, + ); + Ok(()) + } } /// Advance the split point of the store, moving new finalized states to the freezer. @@ -1457,16 +1550,16 @@ pub fn migrate_database, Cold: ItemStore>( let mut hot_db_ops: Vec> = Vec::new(); // 1. Copy all of the states between the head and the split slot, from the hot DB - // to the cold DB. - let state_root_iter = StateRootsIterator::new(&store, frozen_head); - for maybe_pair in state_root_iter.take_while(|result| match result { - Ok((_, slot)) => { + // to the cold DB. Delete the execution payloads of these now-finalized blocks. + let state_root_iter = RootsIterator::new(&store, frozen_head); + for maybe_tuple in state_root_iter.take_while(|result| match result { + Ok((_, _, slot)) => { slot >= ¤t_split_slot && anchor_slot.map_or(true, |anchor_slot| slot >= &anchor_slot) } Err(_) => true, }) { - let (state_root, slot) = maybe_pair?; + let (block_root, state_root, slot) = maybe_tuple?; let mut cold_db_ops: Vec = Vec::new(); @@ -1489,6 +1582,11 @@ pub fn migrate_database, Cold: ItemStore>( // Delete the old summary, and the full state if we lie on an epoch boundary. hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); + + // Delete the execution payload. Even if this execution payload is the payload of the + // new finalized block it is OK to delete it, as `try_get_full_block` looks at the split + // slot when determining whether to reconstruct payloads. + hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); } // Warning: Critical section. We have to take care not to put any of the two databases in an diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 50295df4b0..20147adb9f 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -59,6 +59,12 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { ) } +pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { + App::new("prune_payloads") + .setting(clap::AppSettings::ColoredHelp) + .about("Prune finalized execution payloads") +} + pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new(CMD) .visible_aliases(&["db"]) @@ -85,6 +91,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .subcommand(migrate_cli_app()) .subcommand(version_cli_app()) .subcommand(inspect_cli_app()) + .subcommand(prune_payloads_app()) } fn parse_client_config( @@ -257,6 +264,30 @@ pub fn migrate_db( ) } +pub fn prune_payloads( + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log, + )?; + + // If we're trigging a prune manually then ignore the check on the split's parent that bails + // out early. + let force = true; + db.try_prune_execution_payloads(force) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Result<(), String> { let client_config = parse_client_config(cli_args, &env)?; @@ -273,6 +304,7 @@ pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Re let inspect_config = parse_inspect_config(cli_args)?; inspect_db(inspect_config, client_config, &context, log) } + ("prune_payloads", Some(_)) => prune_payloads(client_config, &context, log), _ => { return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index b28c1a0c3e..aed8ebf394 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1227,6 +1227,19 @@ fn compact_db_flag() { .with_config(|config| assert!(config.store.compact_on_init)); } #[test] +fn prune_payloads_on_startup_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.store.prune_payloads_on_init)); +} +#[test] +fn prune_payloads_on_startup_false() { + CommandLineTest::new() + .flag("prune-payloads-on-startup", Some("false")) + .run_with_zero_port() + .with_config(|config| assert!(!config.store.prune_payloads_on_init)); +} +#[test] fn reconstruct_historic_states_flag() { CommandLineTest::new() .flag("reconstruct-historic-states", None) From f2ac0738d8e9ebc54561ae35abe5538e01da95a4 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 19 Sep 2022 07:58:48 +0000 Subject: [PATCH 06/10] Implement `skip_randao_verification` and blinded block rewards API (#3540) ## Issue Addressed https://github.com/ethereum/beacon-APIs/pull/222 ## Proposed Changes Update Lighthouse's randao verification API to match the `beacon-APIs` spec. We implemented the API before spec stabilisation, and it changed slightly in the course of review. Rather than a flag `verify_randao` taking a boolean value, the new API uses a `skip_randao_verification` flag which takes no argument. The new spec also requires the randao reveal to be present and equal to the point-at-infinity when `skip_randao_verification` is set. I've also updated the `POST /lighthouse/analysis/block_rewards` API to take blinded blocks as input, as the execution payload is irrelevant and we may want to assess blocks produced by builders. ## Additional Info This is technically a breaking change, but seeing as I suspect I'm the only one using these parameters/APIs, I think we're OK to include this in a patch release. --- beacon_node/http_api/src/block_rewards.rs | 4 +- beacon_node/http_api/src/lib.rs | 92 ++++++++++------------- beacon_node/http_api/tests/tests.rs | 76 ++++--------------- common/eth2/src/lib.rs | 43 +++++------ common/eth2/src/types.rs | 30 ++++++-- consensus/types/src/beacon_block.rs | 2 + consensus/types/src/lib.rs | 2 +- crypto/bls/src/generic_signature.rs | 12 +++ 8 files changed, 116 insertions(+), 145 deletions(-) diff --git a/beacon_node/http_api/src/block_rewards.rs b/beacon_node/http_api/src/block_rewards.rs index 3b81b894db..05886a4d02 100644 --- a/beacon_node/http_api/src/block_rewards.rs +++ b/beacon_node/http_api/src/block_rewards.rs @@ -4,7 +4,7 @@ use lru::LruCache; use slog::{debug, warn, Logger}; use state_processing::BlockReplayer; use std::sync::Arc; -use types::BeaconBlock; +use types::BlindedBeaconBlock; use warp_utils::reject::{ beacon_chain_error, beacon_state_error, custom_bad_request, custom_server_error, }; @@ -96,7 +96,7 @@ pub fn get_block_rewards( /// Compute block rewards for blocks passed in as input. pub fn compute_block_rewards( - blocks: Vec>, + blocks: Vec>, chain: Arc>, log: Logger, ) -> Result, warp::Rejection> { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a21b674175..5c2660b303 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -25,7 +25,9 @@ use beacon_chain::{ BeaconChainTypes, ProduceBlockVerification, WhenSlotSkipped, }; pub use block_id::BlockId; -use eth2::types::{self as api_types, EndpointVersion, ValidatorId, ValidatorStatus}; +use eth2::types::{ + self as api_types, EndpointVersion, SkipRandaoVerification, ValidatorId, ValidatorStatus, +}; use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; @@ -35,7 +37,6 @@ use slot_clock::SlotClock; use ssz::Encode; pub use state_id::StateId; use std::borrow::Cow; -use std::convert::TryInto; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; @@ -46,7 +47,7 @@ use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, - ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, + ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock, SignedBlindedBeaconBlock, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, @@ -2002,31 +2003,25 @@ pub fn serve( slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc>| async move { - let randao_reveal = query.randao_reveal.as_ref().map_or_else( - || { - if query.verify_randao { - Err(warp_utils::reject::custom_bad_request( - "randao_reveal is mandatory unless verify_randao=false".into(), - )) - } else { - Ok(Signature::empty()) - } - }, - |sig_bytes| { - sig_bytes.try_into().map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "randao reveal is not a valid BLS signature: {:?}", - e - )) - }) - }, - )?; + let randao_reveal = query.randao_reveal.decompress().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "randao reveal is not a valid BLS signature: {:?}", + e + )) + })?; - let randao_verification = if query.verify_randao { - ProduceBlockVerification::VerifyRandao - } else { - ProduceBlockVerification::NoVerification - }; + let randao_verification = + if query.skip_randao_verification == SkipRandaoVerification::Yes { + if !randao_reveal.is_infinity() { + return Err(warp_utils::reject::custom_bad_request( + "randao_reveal must be point-at-infinity if verification is skipped" + .into(), + )); + } + ProduceBlockVerification::NoVerification + } else { + ProduceBlockVerification::VerifyRandao + }; let (block, _) = chain .produce_block_with_verification::>( @@ -2064,31 +2059,25 @@ pub fn serve( |slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc>| async move { - let randao_reveal = query.randao_reveal.as_ref().map_or_else( - || { - if query.verify_randao { - Err(warp_utils::reject::custom_bad_request( - "randao_reveal is mandatory unless verify_randao=false".into(), - )) - } else { - Ok(Signature::empty()) - } - }, - |sig_bytes| { - sig_bytes.try_into().map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "randao reveal is not a valid BLS signature: {:?}", - e - )) - }) - }, - )?; + let randao_reveal = query.randao_reveal.decompress().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "randao reveal is not a valid BLS signature: {:?}", + e + )) + })?; - let randao_verification = if query.verify_randao { - ProduceBlockVerification::VerifyRandao - } else { - ProduceBlockVerification::NoVerification - }; + let randao_verification = + if query.skip_randao_verification == SkipRandaoVerification::Yes { + if !randao_reveal.is_infinity() { + return Err(warp_utils::reject::custom_bad_request( + "randao_reveal must be point-at-infinity if verification is skipped" + .into() + )); + } + ProduceBlockVerification::NoVerification + } else { + ProduceBlockVerification::VerifyRandao + }; let (block, _) = chain .produce_block_with_verification::>( @@ -2103,6 +2092,7 @@ pub fn serve( .to_ref() .fork_name(&chain.spec) .map_err(inconsistent_fork_rejection)?; + // Pose as a V2 endpoint so we return the fork `version`. fork_versioned_response(V2, fork_name, block) .map(|response| warp::reply::json(&response)) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index ca240e64d2..ff664d6ff0 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1939,11 +1939,11 @@ impl ApiTester { let block = self .client - .get_validator_blocks_with_verify_randao::>( + .get_validator_blocks_modular::>( slot, + &Signature::infinity().unwrap().into(), None, - None, - Some(false), + SkipRandaoVerification::Yes, ) .await .unwrap() @@ -1993,45 +1993,23 @@ impl ApiTester { sk.sign(message).into() }; - // Check failure with no `verify_randao` passed. + // Check failure with no `skip_randao_verification` passed. self.client .get_validator_blocks::>(slot, &bad_randao_reveal, None) .await .unwrap_err(); - // Check failure with `verify_randao=true`. + // Check failure with `skip_randao_verification` (requires infinity sig). self.client - .get_validator_blocks_with_verify_randao::>( + .get_validator_blocks_modular::>( slot, - Some(&bad_randao_reveal), + &bad_randao_reveal, None, - Some(true), + SkipRandaoVerification::Yes, ) .await .unwrap_err(); - // Check failure with no randao reveal provided. - self.client - .get_validator_blocks_with_verify_randao::>( - slot, None, None, None, - ) - .await - .unwrap_err(); - - // Check success with `verify_randao=false`. - let block = self - .client - .get_validator_blocks_with_verify_randao::>( - slot, - Some(&bad_randao_reveal), - None, - Some(false), - ) - .await - .unwrap() - .data; - - assert_eq!(block.slot(), slot); self.chain.slot_clock.set_slot(slot.as_u64() + 1); } @@ -2106,11 +2084,11 @@ impl ApiTester { let block = self .client - .get_validator_blinded_blocks_with_verify_randao::( + .get_validator_blinded_blocks_modular::( slot, + &Signature::infinity().unwrap().into(), None, - None, - Some(false), + SkipRandaoVerification::Yes, ) .await .unwrap() @@ -2162,45 +2140,23 @@ impl ApiTester { sk.sign(message).into() }; - // Check failure with no `verify_randao` passed. + // Check failure with full randao verification enabled. self.client .get_validator_blinded_blocks::(slot, &bad_randao_reveal, None) .await .unwrap_err(); - // Check failure with `verify_randao=true`. + // Check failure with `skip_randao_verification` (requires infinity sig). self.client - .get_validator_blinded_blocks_with_verify_randao::( + .get_validator_blinded_blocks_modular::( slot, - Some(&bad_randao_reveal), + &bad_randao_reveal, None, - Some(true), + SkipRandaoVerification::Yes, ) .await .unwrap_err(); - // Check failure with no randao reveal provided. - self.client - .get_validator_blinded_blocks_with_verify_randao::( - slot, None, None, None, - ) - .await - .unwrap_err(); - - // Check success with `verify_randao=false`. - let block = self - .client - .get_validator_blinded_blocks_with_verify_randao::( - slot, - Some(&bad_randao_reveal), - None, - Some(false), - ) - .await - .unwrap() - .data; - - assert_eq!(block.slot(), slot); self.chain.slot_clock.set_slot(slot.as_u64() + 1); } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index f096aca97e..104ca9ccd4 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1233,17 +1233,17 @@ impl BeaconNodeHttpClient { randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, ) -> Result>, Error> { - self.get_validator_blocks_with_verify_randao(slot, Some(randao_reveal), graffiti, None) + self.get_validator_blocks_modular(slot, randao_reveal, graffiti, SkipRandaoVerification::No) .await } /// `GET v2/validator/blocks/{slot}` - pub async fn get_validator_blocks_with_verify_randao>( + pub async fn get_validator_blocks_modular>( &self, slot: Slot, - randao_reveal: Option<&SignatureBytes>, + randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, - verify_randao: Option, + skip_randao_verification: SkipRandaoVerification, ) -> Result>, Error> { let mut path = self.eth_path(V2)?; @@ -1253,19 +1253,17 @@ impl BeaconNodeHttpClient { .push("blocks") .push(&slot.to_string()); - if let Some(randao_reveal) = randao_reveal { - path.query_pairs_mut() - .append_pair("randao_reveal", &randao_reveal.to_string()); - } + path.query_pairs_mut() + .append_pair("randao_reveal", &randao_reveal.to_string()); if let Some(graffiti) = graffiti { path.query_pairs_mut() .append_pair("graffiti", &graffiti.to_string()); } - if let Some(verify_randao) = verify_randao { + if skip_randao_verification == SkipRandaoVerification::Yes { path.query_pairs_mut() - .append_pair("verify_randao", &verify_randao.to_string()); + .append_pair("skip_randao_verification", ""); } self.get(path).await @@ -1278,25 +1276,22 @@ impl BeaconNodeHttpClient { randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, ) -> Result>, Error> { - self.get_validator_blinded_blocks_with_verify_randao( + self.get_validator_blinded_blocks_modular( slot, - Some(randao_reveal), + randao_reveal, graffiti, - None, + SkipRandaoVerification::No, ) .await } /// `GET v1/validator/blinded_blocks/{slot}` - pub async fn get_validator_blinded_blocks_with_verify_randao< - T: EthSpec, - Payload: ExecPayload, - >( + pub async fn get_validator_blinded_blocks_modular>( &self, slot: Slot, - randao_reveal: Option<&SignatureBytes>, + randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, - verify_randao: Option, + skip_randao_verification: SkipRandaoVerification, ) -> Result>, Error> { let mut path = self.eth_path(V1)?; @@ -1306,19 +1301,17 @@ impl BeaconNodeHttpClient { .push("blinded_blocks") .push(&slot.to_string()); - if let Some(randao_reveal) = randao_reveal { - path.query_pairs_mut() - .append_pair("randao_reveal", &randao_reveal.to_string()); - } + path.query_pairs_mut() + .append_pair("randao_reveal", &randao_reveal.to_string()); if let Some(graffiti) = graffiti { path.query_pairs_mut() .append_pair("graffiti", &graffiti.to_string()); } - if let Some(verify_randao) = verify_randao { + if skip_randao_verification == SkipRandaoVerification::Yes { path.query_pairs_mut() - .append_pair("verify_randao", &verify_randao.to_string()); + .append_key_only("skip_randao_verification"); } self.get(path).await diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 0f8ec51233..e657358003 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -658,16 +658,34 @@ pub struct ProposerData { pub slot: Slot, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Deserialize)] pub struct ValidatorBlocksQuery { - pub randao_reveal: Option, + pub randao_reveal: SignatureBytes, pub graffiti: Option, - #[serde(default = "default_verify_randao")] - pub verify_randao: bool, + pub skip_randao_verification: SkipRandaoVerification, } -fn default_verify_randao() -> bool { - true +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)] +#[serde(try_from = "Option")] +pub enum SkipRandaoVerification { + Yes, + #[default] + No, +} + +/// Parse a `skip_randao_verification` query parameter. +impl TryFrom> for SkipRandaoVerification { + type Error = String; + + fn try_from(opt: Option) -> Result { + match opt.as_deref() { + None => Ok(SkipRandaoVerification::No), + Some("") => Ok(SkipRandaoVerification::Yes), + Some(s) => Err(format!( + "skip_randao_verification does not take a value, got: {s}" + )), + } + } } #[derive(Clone, Serialize, Deserialize)] diff --git a/consensus/types/src/beacon_block.rs b/consensus/types/src/beacon_block.rs index da8566dcb2..0ec1f9a374 100644 --- a/consensus/types/src/beacon_block.rs +++ b/consensus/types/src/beacon_block.rs @@ -66,6 +66,8 @@ pub struct BeaconBlock = FullPayload> { pub body: BeaconBlockBodyMerge, } +pub type BlindedBeaconBlock = BeaconBlock>; + impl> SignedRoot for BeaconBlock {} impl<'a, T: EthSpec, Payload: ExecPayload> SignedRoot for BeaconBlockRef<'a, T, Payload> {} diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index f05012c0b7..32300173eb 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -99,7 +99,7 @@ pub use crate::attestation_duty::AttestationDuty; pub use crate::attester_slashing::AttesterSlashing; pub use crate::beacon_block::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, BeaconBlockRef, - BeaconBlockRefMut, + BeaconBlockRefMut, BlindedBeaconBlock, }; pub use crate::beacon_block_body::{ BeaconBlockBody, BeaconBlockBodyAltair, BeaconBlockBodyBase, BeaconBlockBodyMerge, diff --git a/crypto/bls/src/generic_signature.rs b/crypto/bls/src/generic_signature.rs index 10ef75fc68..01e5ed1d48 100644 --- a/crypto/bls/src/generic_signature.rs +++ b/crypto/bls/src/generic_signature.rs @@ -80,6 +80,18 @@ where self.point.is_none() } + /// Initialize self to the point-at-infinity. + /// + /// In general `AggregateSignature::infinity` should be used in favour of this function. + pub fn infinity() -> Result { + Self::deserialize(&INFINITY_SIGNATURE) + } + + /// Returns `true` if `self` is equal to the point at infinity. + pub fn is_infinity(&self) -> bool { + self.is_infinity + } + /// Returns a reference to the underlying BLS point. pub(crate) fn point(&self) -> Option<&Sig> { self.point.as_ref() From 507bb9dad46a02e6331d936d5a9974306d00ba80 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 19 Sep 2022 07:58:49 +0000 Subject: [PATCH 07/10] Refined payload pruning (#3587) ## Proposed Changes Improve the payload pruning feature in several ways: - Payload pruning is now entirely optional. It is enabled by default but can be disabled with `--prune-payloads false`. The previous `--prune-payloads-on-startup` flag from #3565 is removed. - Initial payload pruning on startup now runs in a background thread. This thread will always load the split state, which is a small fraction of its total work (up to ~300ms) and then backtrack from that state. This pruning process ran in 2m5s on one Prater node with good I/O and 16m on a node with slower I/O. - To work with the optional payload pruning the database function `try_load_full_block` will now attempt to load execution payloads for finalized slots _if_ pruning is currently disabled. This gives users an opt-out for the extensive traffic between the CL and EL for reconstructing payloads. ## Additional Info If the `prune-payloads` flag is toggled on and off then the on-startup check may not see any payloads to delete and fail to clean them up. In this case the `lighthouse db prune_payloads` command should be used to force a manual sweep of the database. --- beacon_node/beacon_chain/src/builder.rs | 21 ++++--- beacon_node/src/cli.rs | 8 ++- beacon_node/src/config.rs | 6 +- beacon_node/store/src/config.rs | 6 +- beacon_node/store/src/hot_cold_store.rs | 75 +++++++++++++++++++------ lighthouse/tests/beacon_node.rs | 8 +-- 6 files changed, 85 insertions(+), 39 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 45375ec01e..916ebd2359 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -266,13 +266,6 @@ where self.genesis_time = Some(genesis_state.genesis_time()); - // Prune finalized execution payloads. - if store.get_config().prune_payloads_on_init { - store - .try_prune_execution_payloads(false) - .map_err(|e| format!("Error pruning execution payloads: {e:?}"))?; - } - self.op_pool = Some( store .get_item::>(&OP_POOL_DB_KEY) @@ -863,6 +856,20 @@ where beacon_chain.store_migrator.process_reconstruction(); } + // Prune finalized execution payloads in the background. + if beacon_chain.store.get_config().prune_payloads { + let store = beacon_chain.store.clone(); + let log = log.clone(); + beacon_chain.task_executor.spawn_blocking( + move || { + if let Err(e) = store.try_prune_execution_payloads(false) { + error!(log, "Error pruning payloads in background"; "error" => ?e); + } + }, + "prune_payloads_background", + ); + } + Ok(beacon_chain) } } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index d9d5c715b4..9b5f65622a 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -516,9 +516,11 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("true") ) .arg( - Arg::with_name("prune-payloads-on-startup") - .long("prune-payloads-on-startup") - .help("Check for execution payloads to prune on start-up.") + Arg::with_name("prune-payloads") + .long("prune-payloads") + .help("Prune execution payloads from Lighthouse's database. This saves space but \ + imposes load on the execution client, as payloads need to be \ + reconstructed and sent to syncing peers.") .takes_value(true) .default_value("true") ) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 368fce573d..982cb82ed4 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -358,10 +358,8 @@ pub fn get_config( .map_err(|_| "auto-compact-db takes a boolean".to_string())?; } - if let Some(prune_payloads_on_init) = - clap_utils::parse_optional(cli_args, "prune-payloads-on-startup")? - { - client_config.store.prune_payloads_on_init = prune_payloads_on_init; + if let Some(prune_payloads) = clap_utils::parse_optional(cli_args, "prune-payloads")? { + client_config.store.prune_payloads = prune_payloads; } /* diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 9bc9ee8a45..027b8152ee 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -21,8 +21,8 @@ pub struct StoreConfig { pub compact_on_init: bool, /// Whether to compact the database during database pruning. pub compact_on_prune: bool, - /// Whether to try pruning execution payloads on initialization. - pub prune_payloads_on_init: bool, + /// Whether to prune payloads on initialization and finalization. + pub prune_payloads: bool, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -45,7 +45,7 @@ impl Default for StoreConfig { block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, compact_on_init: false, compact_on_prune: true, - prune_payloads_on_init: true, + prune_payloads: true, } } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 80de674e9a..aff2be4cf1 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -21,6 +21,7 @@ use crate::{ get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, }; +use itertools::process_results; use leveldb::iterator::LevelDBIterator; use lru::LruCache; use parking_lot::{Mutex, RwLock}; @@ -334,8 +335,10 @@ impl, Cold: ItemStore> HotColdDB }; // If the block is after the split point then we should have the full execution payload - // stored in the database. Otherwise, just return the blinded block. - // Hold the split lock so that it can't change. + // stored in the database. If it isn't but payload pruning is disabled, try to load it + // on-demand. + // + // Hold the split lock so that it can't change while loading the payload. let split = self.split.read_recursive(); let block = if blinded_block.message().execution_payload().is_err() @@ -348,6 +351,18 @@ impl, Cold: ItemStore> HotColdDB self.block_cache.lock().put(*block_root, full_block.clone()); DatabaseBlock::Full(full_block) + } else if !self.config.prune_payloads { + // If payload pruning is disabled there's a chance we may have the payload of + // this finalized block. Attempt to load it but don't error in case it's missing. + if let Some(payload) = self.get_execution_payload(block_root)? { + DatabaseBlock::Full( + blinded_block + .try_into_full_block(Some(payload)) + .ok_or(Error::AddPayloadLogicError)?, + ) + } else { + DatabaseBlock::Blinded(blinded_block) + } } else { DatabaseBlock::Blinded(blinded_block) }; @@ -388,7 +403,9 @@ impl, Cold: ItemStore> HotColdDB blinded_block: SignedBeaconBlock>, ) -> Result, Error> { if blinded_block.message().execution_payload().is_ok() { - let execution_payload = self.get_execution_payload(block_root)?; + let execution_payload = self + .get_execution_payload(block_root)? + .ok_or(HotColdDBError::MissingExecutionPayload(*block_root))?; blinded_block.try_into_full_block(Some(execution_payload)) } else { blinded_block.try_into_full_block(None) @@ -433,9 +450,8 @@ impl, Cold: ItemStore> HotColdDB pub fn get_execution_payload( &self, block_root: &Hash256, - ) -> Result, Error> { - self.get_item(block_root)? - .ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into()) + ) -> Result>, Error> { + self.get_item(block_root) } /// Check if the execution payload for a block exists on disk. @@ -1446,19 +1462,36 @@ impl, Cold: ItemStore> HotColdDB // The finalized block may or may not have its execution payload stored, depending on // whether it was at a skipped slot. However for a fully pruned database its parent - // should *always* have been pruned. - let split_parent_block_root = split_state.get_block_root(split.slot - 1)?; - if !self.execution_payload_exists(split_parent_block_root)? && !force { + // should *always* have been pruned. In case of a long split (no parent found) we + // continue as if the payloads are pruned, as the node probably has other things to worry + // about. + let split_block_root = split_state.get_latest_block_root(split.state_root); + + let already_pruned = + process_results(split_state.rev_iter_block_roots(&self.spec), |mut iter| { + iter.find(|(_, block_root)| *block_root != split_block_root) + .map_or(Ok(true), |(_, split_parent_root)| { + self.execution_payload_exists(&split_parent_root) + .map(|exists| !exists) + }) + })??; + + if already_pruned && !force { info!(self.log, "Execution payloads are pruned"); return Ok(()); } // Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes // first. - let split_block_root = split_state.get_latest_block_root(split.state_root); + warn!( + self.log, + "Pruning finalized payloads"; + "info" => "you may notice degraded I/O performance while this runs" + ); let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot); let mut ops = vec![]; + let mut last_pruned_block_root = None; for res in std::iter::once(Ok((split_block_root, split.slot))) .chain(BlockRootsIterator::new(self, &split_state)) @@ -1468,7 +1501,7 @@ impl, Cold: ItemStore> HotColdDB Err(e) => { warn!( self.log, - "Stopping backtrack early"; + "Stopping payload pruning early"; "error" => ?e, ); break; @@ -1478,25 +1511,28 @@ impl, Cold: ItemStore> HotColdDB if slot < bellatrix_fork_slot { info!( self.log, - "Finished backtrack to Bellatrix fork"; + "Payload pruning reached Bellatrix boundary"; ); break; } - if self.execution_payload_exists(&block_root)? { + if Some(block_root) != last_pruned_block_root + && self.execution_payload_exists(&block_root)? + { debug!( self.log, "Pruning execution payload"; "slot" => slot, "block_root" => ?block_root, ); + last_pruned_block_root = Some(block_root); ops.push(StoreOp::DeleteExecutionPayload(block_root)); } if Some(slot) == anchor_slot { info!( self.log, - "Finished backtrack to anchor state"; + "Payload pruning reached anchor state"; "slot" => slot ); break; @@ -1583,10 +1619,13 @@ pub fn migrate_database, Cold: ItemStore>( // Delete the old summary, and the full state if we lie on an epoch boundary. hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); - // Delete the execution payload. Even if this execution payload is the payload of the - // new finalized block it is OK to delete it, as `try_get_full_block` looks at the split - // slot when determining whether to reconstruct payloads. - hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); + // Delete the execution payload if payload pruning is enabled. At a skipped slot we may + // delete the payload for the finalized block itself, but that's OK as we only guarantee + // that payloads are present for slots >= the split slot. The payload fetching code is also + // forgiving of missing payloads. + if store.config.prune_payloads { + hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); + } } // Warning: Critical section. We have to take care not to put any of the two databases in an diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index aed8ebf394..661bbcdb0c 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1227,17 +1227,17 @@ fn compact_db_flag() { .with_config(|config| assert!(config.store.compact_on_init)); } #[test] -fn prune_payloads_on_startup_default() { +fn prune_payloads_default() { CommandLineTest::new() .run_with_zero_port() - .with_config(|config| assert!(config.store.prune_payloads_on_init)); + .with_config(|config| assert!(config.store.prune_payloads)); } #[test] fn prune_payloads_on_startup_false() { CommandLineTest::new() - .flag("prune-payloads-on-startup", Some("false")) + .flag("prune-payloads", Some("false")) .run_with_zero_port() - .with_config(|config| assert!(!config.store.prune_payloads_on_init)); + .with_config(|config| assert!(!config.store.prune_payloads)); } #[test] fn reconstruct_historic_states_flag() { From a95bcba2ab9f88cf24ae09ccab7263014d82b476 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 19 Sep 2022 07:58:50 +0000 Subject: [PATCH 08/10] Avoid holding write-lock whilst waiting on shuffling cache promise (#3589) ## Issue Addressed NA ## Proposed Changes Fixes a bug which hogged the write-lock for the `shuffling_cache`. ## Additional Info NA --- beacon_node/beacon_chain/src/beacon_chain.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 609969a9a6..a0377f5fc0 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4491,6 +4491,10 @@ impl BeaconChain { metrics::stop_timer(cache_wait_timer); if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { + // The shuffling cache is no longer required, drop the write-lock to allow concurrent + // access. + drop(shuffling_cache); + let committee_cache = cache_item.wait()?; map_fn(&committee_cache, shuffling_id.shuffling_decision_block) } else { From 96692b8e43c63897009fd914910a487c45f506d0 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 21 Sep 2022 01:01:50 +0000 Subject: [PATCH 09/10] Impl `oneshot_broadcast` for committee promises (#3595) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Issue Addressed NA ## Proposed Changes Fixes an issue introduced in #3574 where I erroneously assumed that a `crossbeam_channel` multiple receiver queue was a *broadcast* queue. This is incorrect, each message will be received by *only one* receiver. The effect of this mistake is these logs: ``` Sep 20 06:56:17.001 INFO Synced slot: 4736079, block: 0xaa8a…180d, epoch: 148002, finalized_epoch: 148000, finalized_root: 0x2775…47f2, exec_hash: 0x2ca5…ffde (verified), peers: 6, service: slot_notifier Sep 20 06:56:23.237 ERRO Unable to validate attestation error: CommitteeCacheWait(RecvError), peer_id: 16Uiu2HAm2Jnnj8868tb7hCta1rmkXUf5YjqUH1YPj35DCwNyeEzs, type: "aggregated", slot: Slot(4736047), beacon_block_root: 0x88d318534b1010e0ebd79aed60b6b6da1d70357d72b271c01adf55c2b46206c1 ``` ## Additional Info NA --- Cargo.lock | 9 +- Cargo.toml | 1 + beacon_node/beacon_chain/Cargo.toml | 2 +- beacon_node/beacon_chain/src/beacon_chain.rs | 8 +- beacon_node/beacon_chain/src/errors.rs | 2 +- .../beacon_chain/src/shuffling_cache.rs | 18 +- common/oneshot_broadcast/Cargo.toml | 9 + common/oneshot_broadcast/src/lib.rs | 188 ++++++++++++++++++ 8 files changed, 218 insertions(+), 19 deletions(-) create mode 100644 common/oneshot_broadcast/Cargo.toml create mode 100644 common/oneshot_broadcast/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4ca2739d14..9cd7ff2ff9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,7 +387,6 @@ version = "0.2.0" dependencies = [ "bitvec 0.20.4", "bls", - "crossbeam-channel", "derivative", "environment", "eth1", @@ -410,6 +409,7 @@ dependencies = [ "lru", "maplit", "merkle_proof", + "oneshot_broadcast", "operation_pool", "parking_lot 0.12.1", "proto_array", @@ -4343,6 +4343,13 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" +[[package]] +name = "oneshot_broadcast" +version = "0.1.0" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "oorandom" version = "11.1.3" diff --git a/Cargo.toml b/Cargo.toml index a71a97a959..415c721d99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "common/logging", "common/lru_cache", "common/malloc_utils", + "common/oneshot_broadcast", "common/sensitive_url", "common/slot_clock", "common/task_executor", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 43cbdf1347..dd185ac757 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -63,7 +63,7 @@ superstruct = "0.5.0" hex = "0.4.2" exit-future = "0.2.0" unused_port = {path = "../../common/unused_port"} -crossbeam-channel = "0.5.6" +oneshot_broadcast = { path = "../../common/oneshot_broadcast" } [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a0377f5fc0..96439f4908 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4609,13 +4609,7 @@ impl BeaconChain { metrics::stop_timer(committee_building_timer); - if let Err(e) = sender.send(committee_cache.clone()) { - debug!( - self.log, - "Did not fulfil committee promise"; - "error" => %e - ) - } + sender.send(committee_cache.clone()); map_fn(&committee_cache, shuffling_decision_block) } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 8b547acf0f..704cba489d 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -202,7 +202,7 @@ pub enum BeaconChainError { }, AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, - CommitteeCacheWait(crossbeam_channel::RecvError), + CommitteePromiseFailed(oneshot_broadcast::Error), MaxCommitteePromises(usize), } diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 3fc5bebdf6..a01847a0e1 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,6 +1,6 @@ use crate::{metrics, BeaconChainError}; -use crossbeam_channel::{bounded, Receiver, Sender, TryRecvError}; use lru::LruCache; +use oneshot_broadcast::{oneshot, Receiver, Sender}; use std::sync::Arc; use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256}; @@ -40,7 +40,7 @@ impl CacheItem { CacheItem::Committee(cache) => Ok(cache), CacheItem::Promise(receiver) => receiver .recv() - .map_err(BeaconChainError::CommitteeCacheWait), + .map_err(BeaconChainError::CommitteePromiseFailed), } } } @@ -72,7 +72,7 @@ impl ShufflingCache { item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { // The promise has already been resolved. Replace the entry in the cache with a // `Committee` entry and then return the committee. - Ok(committee) => { + Ok(Some(committee)) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); let ready = CacheItem::Committee(committee); @@ -81,7 +81,7 @@ impl ShufflingCache { } // The promise has not yet been resolved. Return the promise so the caller can await // it. - Err(TryRecvError::Empty) => { + Ok(None) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); item.cloned() @@ -96,7 +96,7 @@ impl ShufflingCache { // memory and the nature of the LRU cache means that future, relevant entries will // still be added to the cache. We expect that *all* promises should be resolved, // unless there is a programming or database error. - Err(TryRecvError::Disconnected) => { + Err(oneshot_broadcast::Error::SenderDropped) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); self.cache.pop(key); @@ -147,7 +147,7 @@ impl ShufflingCache { return Err(BeaconChainError::MaxCommitteePromises(num_active_promises)); } - let (sender, receiver) = bounded(1); + let (sender, receiver) = oneshot(); self.cache.put(key, CacheItem::Promise(receiver)); Ok(sender) } @@ -262,7 +262,7 @@ mod test { ); // Resolve the promise. - sender.send(committee_a.clone()).unwrap(); + sender.send(committee_a.clone()); // Ensure the promise has been resolved. let item = cache.get(&id_a).unwrap(); @@ -324,7 +324,7 @@ mod test { ); // Resolve promise A. - sender_a.send(committee_a.clone()).unwrap(); + sender_a.send(committee_a.clone()); // Ensure promise A has been resolved. let item = cache.get(&id_a).unwrap(); assert!( @@ -333,7 +333,7 @@ mod test { ); // Resolve promise B. - sender_b.send(committee_b.clone()).unwrap(); + sender_b.send(committee_b.clone()); // Ensure promise B has been resolved. let item = cache.get(&id_b).unwrap(); assert!( diff --git a/common/oneshot_broadcast/Cargo.toml b/common/oneshot_broadcast/Cargo.toml new file mode 100644 index 0000000000..baefe10661 --- /dev/null +++ b/common/oneshot_broadcast/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "oneshot_broadcast" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +parking_lot = "0.12.0" diff --git a/common/oneshot_broadcast/src/lib.rs b/common/oneshot_broadcast/src/lib.rs new file mode 100644 index 0000000000..237fc03ccd --- /dev/null +++ b/common/oneshot_broadcast/src/lib.rs @@ -0,0 +1,188 @@ +//! Provides a single-sender, multiple receiver one-shot channel where any message sent will be +//! received by all senders. +//! +//! This implementation may not be blazingly fast but it should be simple enough to be reliable. +use parking_lot::{Condvar, Mutex}; +use std::sync::{Arc, Weak}; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum Error { + SenderDropped, +} + +enum Future { + /// The future is ready and the item may be consumed. + Ready(T), + /// Future is not ready. The contained `Weak` is a reference to the `Sender` that may be used to + /// detect when the channel is disconnected. + NotReady(Weak<()>), +} + +struct MutexCondvar { + mutex: Mutex>, + condvar: Condvar, +} + +/// The sending pair of the `oneshot` channel. +pub struct Sender(Arc>, Option>); + +impl Sender { + /// Send a message, consuming `self` and delivering the message to *all* receivers. + pub fn send(self, item: T) { + *self.0.mutex.lock() = Future::Ready(item); + // Condvar notification will be handled by the `Drop` implementation. + } +} + +impl Drop for Sender { + /// Drop the `Arc` and notify all receivers so they can't upgrade their `Weak`s and know that + /// the sender has been dropped. + fn drop(&mut self) { + self.1 = None; + self.0.condvar.notify_all(); + } +} + +/// The receiving pair of the `oneshot` channel. Always receives the message sent by the `Sender` +/// (if any). +#[derive(Clone)] +pub struct Receiver(Arc>); + +impl Receiver { + /// Check to see if there is a message to be read *without* blocking/waiting. + /// + /// ## Note + /// + /// This method will technically perform *some* blocking to access a `Mutex`. It is non-blocking + /// in the sense that it won't block until a message is received (i.e., it may return `Ok(None)` + /// if no message has been sent yet). + pub fn try_recv(&self) -> Result, Error> { + match &*self.0.mutex.lock() { + Future::Ready(item) => Ok(Some(item.clone())), + Future::NotReady(weak) if weak.upgrade().is_some() => Ok(None), + Future::NotReady(_) => Err(Error::SenderDropped), + } + } + + /// Check to see if there is a message to be read whilst blocking/waiting until a message is + /// sent or the `Sender` is dropped. + pub fn recv(self) -> Result { + let mut lock = self.0.mutex.lock(); + loop { + match &*lock { + Future::Ready(item) => return Ok(item.clone()), + Future::NotReady(weak) if weak.upgrade().is_some() => { + self.0.condvar.wait(&mut lock) + } + Future::NotReady(_) => return Err(Error::SenderDropped), + } + } + } +} + +/// A single-sender, multiple-receiver broadcast channel. +/// +/// The sender may send *only one* message which will be received by *all* receivers. +pub fn oneshot() -> (Sender, Receiver) { + let sender_ref = Arc::new(()); + let mutex_condvar = Arc::new(MutexCondvar { + mutex: Mutex::new(Future::NotReady(Arc::downgrade(&sender_ref))), + condvar: Condvar::new(), + }); + let receiver = Receiver(mutex_condvar.clone()); + let sender = Sender(mutex_condvar, Some(sender_ref)); + (sender, receiver) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + use std::time::Duration; + + #[test] + fn single_thread_try_recv() { + let (sender, receiver) = oneshot(); + assert_eq!(receiver.try_recv(), Ok(None)); + sender.send(42); + assert_eq!(receiver.try_recv(), Ok(Some(42))); + } + + #[test] + fn single_thread_try_recv_no_message() { + let (sender, receiver) = oneshot::(); + assert_eq!(receiver.try_recv(), Ok(None)); + drop(sender); + assert_eq!(receiver.try_recv(), Err(Error::SenderDropped)); + } + + #[test] + fn single_thread_recv() { + let (sender, receiver) = oneshot(); + assert_eq!(receiver.try_recv(), Ok(None)); + sender.send(42); + assert_eq!(receiver.recv(), Ok(42)); + } + + #[test] + fn single_thread_recv_no_message() { + let (sender, receiver) = oneshot::(); + assert_eq!(receiver.try_recv(), Ok(None)); + drop(sender); + assert_eq!(receiver.recv(), Err(Error::SenderDropped)); + } + + #[test] + fn two_threads_message_sent() { + let (sender, receiver) = oneshot(); + + let handle = thread::spawn(|| receiver.recv().unwrap()); + + sender.send(42); + assert_eq!(handle.join().unwrap(), 42); + } + + #[test] + fn three_threads_message_set() { + let (sender, receiver) = oneshot(); + + let receiver_a = receiver.clone(); + let handle_a = thread::spawn(|| receiver_a.recv().unwrap()); + let handle_b = thread::spawn(|| receiver.recv().unwrap()); + + sender.send(42); + assert_eq!(handle_a.join().unwrap(), 42); + assert_eq!(handle_b.join().unwrap(), 42); + } + + #[test] + fn three_threads_sender_dropped() { + let (sender, receiver) = oneshot::(); + + let receiver_a = receiver.clone(); + let handle_a = thread::spawn(|| receiver_a.recv()); + let handle_b = thread::spawn(|| receiver.recv()); + + drop(sender); + assert_eq!(handle_a.join().unwrap(), Err(Error::SenderDropped)); + assert_eq!(handle_b.join().unwrap(), Err(Error::SenderDropped)); + } + + #[test] + fn sender_dropped_after_recv() { + let (sender_a, receiver_a) = oneshot(); + let (sender_b, receiver_b) = oneshot::(); + + let handle_0 = thread::spawn(|| { + sender_a.send(1); + receiver_b.recv() + }); + + assert_eq!(receiver_a.recv(), Ok(1)); + // This is a slightly hacky sleep that assumes that the thread has had enough time after + // sending down `sender_a` to start listening to `receiver_b`. + thread::sleep(Duration::from_secs(1)); + drop(sender_b); + assert_eq!(handle_0.join().unwrap(), Err(Error::SenderDropped)) + } +} From dadbd69eec195c42265bedd7d661077784b541c3 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 21 Sep 2022 10:52:14 +0000 Subject: [PATCH 10/10] Fix concurrency issue with `oneshot_broadcast` (#3596) ## Issue Addressed NA ## Proposed Changes Fixes an issue found during testing with #3595. ## Additional Info NA --- common/oneshot_broadcast/src/lib.rs | 36 +++++++++++++++-------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/common/oneshot_broadcast/src/lib.rs b/common/oneshot_broadcast/src/lib.rs index 237fc03ccd..2c616b3bb3 100644 --- a/common/oneshot_broadcast/src/lib.rs +++ b/common/oneshot_broadcast/src/lib.rs @@ -3,7 +3,7 @@ //! //! This implementation may not be blazingly fast but it should be simple enough to be reliable. use parking_lot::{Condvar, Mutex}; -use std::sync::{Arc, Weak}; +use std::sync::Arc; #[derive(Copy, Clone, Debug, PartialEq)] pub enum Error { @@ -13,9 +13,10 @@ pub enum Error { enum Future { /// The future is ready and the item may be consumed. Ready(T), - /// Future is not ready. The contained `Weak` is a reference to the `Sender` that may be used to - /// detect when the channel is disconnected. - NotReady(Weak<()>), + /// Future is not ready. + NotReady, + /// The sender has been dropped without sending a message. + SenderDropped, } struct MutexCondvar { @@ -24,7 +25,7 @@ struct MutexCondvar { } /// The sending pair of the `oneshot` channel. -pub struct Sender(Arc>, Option>); +pub struct Sender(Arc>); impl Sender { /// Send a message, consuming `self` and delivering the message to *all* receivers. @@ -35,11 +36,15 @@ impl Sender { } impl Drop for Sender { - /// Drop the `Arc` and notify all receivers so they can't upgrade their `Weak`s and know that - /// the sender has been dropped. + /// Flag the sender as dropped and notify all receivers. fn drop(&mut self) { - self.1 = None; + let mut lock = self.0.mutex.lock(); + if !matches!(*lock, Future::Ready(_)) { + *lock = Future::SenderDropped + } self.0.condvar.notify_all(); + // The lock must be held whilst the condvar is notified. + drop(lock); } } @@ -59,8 +64,8 @@ impl Receiver { pub fn try_recv(&self) -> Result, Error> { match &*self.0.mutex.lock() { Future::Ready(item) => Ok(Some(item.clone())), - Future::NotReady(weak) if weak.upgrade().is_some() => Ok(None), - Future::NotReady(_) => Err(Error::SenderDropped), + Future::NotReady => Ok(None), + Future::SenderDropped => Err(Error::SenderDropped), } } @@ -71,10 +76,8 @@ impl Receiver { loop { match &*lock { Future::Ready(item) => return Ok(item.clone()), - Future::NotReady(weak) if weak.upgrade().is_some() => { - self.0.condvar.wait(&mut lock) - } - Future::NotReady(_) => return Err(Error::SenderDropped), + Future::NotReady => self.0.condvar.wait(&mut lock), + Future::SenderDropped => return Err(Error::SenderDropped), } } } @@ -84,13 +87,12 @@ impl Receiver { /// /// The sender may send *only one* message which will be received by *all* receivers. pub fn oneshot() -> (Sender, Receiver) { - let sender_ref = Arc::new(()); let mutex_condvar = Arc::new(MutexCondvar { - mutex: Mutex::new(Future::NotReady(Arc::downgrade(&sender_ref))), + mutex: Mutex::new(Future::NotReady), condvar: Condvar::new(), }); let receiver = Receiver(mutex_condvar.clone()); - let sender = Sender(mutex_condvar, Some(sender_ref)); + let sender = Sender(mutex_condvar); (sender, receiver) }