diff --git a/Cargo.lock b/Cargo.lock index 0590fe5080..fc47f85237 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -418,6 +418,7 @@ dependencies = [ "lru", "maplit", "merkle_proof", + "oneshot_broadcast", "operation_pool", "parking_lot 0.12.0", "proto_array", @@ -4440,6 +4441,13 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" +[[package]] +name = "oneshot_broadcast" +version = "0.1.0" +dependencies = [ + "parking_lot 0.12.0", +] + [[package]] name = "oorandom" version = "11.1.3" diff --git a/Cargo.toml b/Cargo.toml index 7ef316f630..8ac247734c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "common/logging", "common/lru_cache", "common/malloc_utils", + "common/oneshot_broadcast", "common/sensitive_url", "common/slot_clock", "common/task_executor", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 092f3064d5..dd185ac757 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -63,6 +63,7 @@ superstruct = "0.5.0" hex = "0.4.2" exit-future = "0.2.0" unused_port = {path = "../../common/unused_port"} +oneshot_broadcast = { path = "../../common/oneshot_broadcast" } [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e1ef5f7c58..e8b7e421e7 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2633,7 +2633,7 @@ impl BeaconChain { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert(shuffling_id, committee_cache); + .insert_committee_cache(shuffling_id, committee_cache); } } @@ -4430,9 +4430,22 @@ impl BeaconChain { metrics::stop_timer(cache_wait_timer); - if let Some(committee_cache) = shuffling_cache.get(&shuffling_id) { - map_fn(committee_cache, shuffling_id.shuffling_decision_block) + if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { + // The shuffling cache is no longer required, drop the write-lock to allow concurrent + // access. + drop(shuffling_cache); + + let committee_cache = cache_item.wait()?; + map_fn(&committee_cache, shuffling_id.shuffling_decision_block) } else { + // Create an entry in the cache that "promises" this value will eventually be computed. + // This avoids the case where multiple threads attempt to produce the same value at the + // same time. + // + // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same + // promise from being created twice. + let sender = shuffling_cache.create_promise(shuffling_id.clone())?; + // Drop the shuffling cache to avoid holding the lock for any longer than // required. drop(shuffling_cache); @@ -4519,17 +4532,19 @@ impl BeaconChain { state.build_committee_cache(relative_epoch, &self.spec)?; - let committee_cache = state.committee_cache(relative_epoch)?; + let committee_cache = state.committee_cache(relative_epoch)?.clone(); let shuffling_decision_block = shuffling_id.shuffling_decision_block; self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert(shuffling_id, committee_cache); + .insert_committee_cache(shuffling_id, &committee_cache); metrics::stop_timer(committee_building_timer); - map_fn(committee_cache, shuffling_decision_block) + sender.send(committee_cache.clone()); + + map_fn(&committee_cache, shuffling_decision_block) } } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 646c3840fe..37e3c35293 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -874,6 +874,20 @@ where beacon_chain.store_migrator.process_reconstruction(); } + // Prune finalized execution payloads in the background. + if beacon_chain.store.get_config().prune_payloads { + let store = beacon_chain.store.clone(); + let log = log.clone(); + beacon_chain.task_executor.spawn_blocking( + move || { + if let Err(e) = store.try_prune_execution_payloads(false) { + error!(log, "Error pruning payloads in background"; "error" => ?e); + } + }, + "prune_payloads_background", + ); + } + Ok(beacon_chain) } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index e904a26679..7a23ff675d 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -202,6 +202,8 @@ pub enum BeaconChainError { }, AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, + CommitteePromiseFailed(oneshot_broadcast::Error), + MaxCommitteePromises(usize), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index e1e48efacb..8e56a34bae 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -250,6 +250,10 @@ lazy_static! { try_create_int_counter("beacon_shuffling_cache_hits_total", "Count of times shuffling cache fulfils request"); pub static ref SHUFFLING_CACHE_MISSES: Result = try_create_int_counter("beacon_shuffling_cache_misses_total", "Count of times shuffling cache fulfils request"); + pub static ref SHUFFLING_CACHE_PROMISE_HITS: Result = + try_create_int_counter("beacon_shuffling_cache_promise_hits_total", "Count of times shuffling cache returns a promise to future shuffling"); + pub static ref SHUFFLING_CACHE_PROMISE_FAILS: Result = + try_create_int_counter("beacon_shuffling_cache_promise_fails_total", "Count of times shuffling cache detects a failed promise"); /* * Early attester cache diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 0bbd4419b9..a01847a0e1 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,5 +1,7 @@ -use crate::metrics; +use crate::{metrics, BeaconChainError}; use lru::LruCache; +use oneshot_broadcast::{oneshot, Receiver, Sender}; +use std::sync::Arc; use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256}; /// The size of the LRU cache that stores committee caches for quicker verification. @@ -9,12 +11,46 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256 /// ignores a few extra bytes in the caches that should be insignificant compared to the indices). const CACHE_SIZE: usize = 16; +/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this +/// limits the number of concurrent states that can be loaded into memory for the committee cache. +/// This prevents excessive memory usage at the cost of rejecting some attestations. +/// +/// We set this value to 2 since states can be quite large and have a significant impact on memory +/// usage. A healthy network cannot have more than a few committee caches and those caches should +/// always be inserted during block import. Unstable networks with a high degree of forking might +/// see some attestations dropped due to this concurrency limit, however I propose that this is +/// better than low-resource nodes going OOM. +const MAX_CONCURRENT_PROMISES: usize = 2; + +#[derive(Clone)] +pub enum CacheItem { + /// A committee. + Committee(Arc), + /// A promise for a future committee. + Promise(Receiver>), +} + +impl CacheItem { + pub fn is_promise(&self) -> bool { + matches!(self, CacheItem::Promise(_)) + } + + pub fn wait(self) -> Result, BeaconChainError> { + match self { + CacheItem::Committee(cache) => Ok(cache), + CacheItem::Promise(receiver) => receiver + .recv() + .map_err(BeaconChainError::CommitteePromiseFailed), + } + } +} + /// Provides an LRU cache for `CommitteeCache`. /// /// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like /// a find/replace error. pub struct ShufflingCache { - cache: LruCache, + cache: LruCache, } impl ShufflingCache { @@ -24,27 +60,114 @@ impl ShufflingCache { } } - pub fn get(&mut self, key: &AttestationShufflingId) -> Option<&CommitteeCache> { - let opt = self.cache.get(key); - - if opt.is_some() { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - } else { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + pub fn get(&mut self, key: &AttestationShufflingId) -> Option { + match self.cache.get(key) { + // The cache contained the committee cache, return it. + item @ Some(CacheItem::Committee(_)) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + item.cloned() + } + // The cache contains a promise for the committee cache. Check to see if the promise has + // already been resolved, without waiting for it. + item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { + // The promise has already been resolved. Replace the entry in the cache with a + // `Committee` entry and then return the committee. + Ok(Some(committee)) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + let ready = CacheItem::Committee(committee); + self.cache.put(key.clone(), ready.clone()); + Some(ready) + } + // The promise has not yet been resolved. Return the promise so the caller can await + // it. + Ok(None) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + item.cloned() + } + // The sender has been dropped without sending a committee. There was most likely an + // error computing the committee cache. Drop the key from the cache and return + // `None` so the caller can recompute the committee. + // + // It's worth noting that this is the only place where we removed unresolved + // promises from the cache. This means unresolved promises will only be removed if + // we try to access them again. This is OK, since the promises don't consume much + // memory and the nature of the LRU cache means that future, relevant entries will + // still be added to the cache. We expect that *all* promises should be resolved, + // unless there is a programming or database error. + Err(oneshot_broadcast::Error::SenderDropped) => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS); + metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + self.cache.pop(key); + None + } + }, + // The cache does not have this committee and it's not already promised to be computed. + None => { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + None + } } - - opt } pub fn contains(&self, key: &AttestationShufflingId) -> bool { self.cache.contains(key) } - pub fn insert(&mut self, key: AttestationShufflingId, committee_cache: &CommitteeCache) { - if !self.cache.contains(&key) { - self.cache.put(key, committee_cache.clone()); + pub fn insert_committee_cache( + &mut self, + key: AttestationShufflingId, + committee_cache: &T, + ) { + if self + .cache + .get(&key) + // Replace the committee if it's not present or if it's a promise. A bird in the hand is + // worth two in the promise-bush! + .map_or(true, CacheItem::is_promise) + { + self.cache.put( + key, + CacheItem::Committee(committee_cache.to_arc_committee_cache()), + ); } } + + pub fn create_promise( + &mut self, + key: AttestationShufflingId, + ) -> Result>, BeaconChainError> { + let num_active_promises = self + .cache + .iter() + .filter(|(_, item)| item.is_promise()) + .count(); + if num_active_promises >= MAX_CONCURRENT_PROMISES { + return Err(BeaconChainError::MaxCommitteePromises(num_active_promises)); + } + + let (sender, receiver) = oneshot(); + self.cache.put(key, CacheItem::Promise(receiver)); + Ok(sender) + } +} + +/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. +pub trait ToArcCommitteeCache { + fn to_arc_committee_cache(&self) -> Arc; +} + +impl ToArcCommitteeCache for CommitteeCache { + fn to_arc_committee_cache(&self) -> Arc { + Arc::new(self.clone()) + } +} + +impl ToArcCommitteeCache for Arc { + fn to_arc_committee_cache(&self) -> Arc { + self.clone() + } } impl Default for ShufflingCache { @@ -79,3 +202,177 @@ impl BlockShufflingIds { } } } + +// Disable tests in debug since the beacon chain harness is slow unless in release. +#[cfg(not(debug_assertions))] +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::EphemeralHarnessType; + use types::*; + + type BeaconChainHarness = + crate::test_utils::BeaconChainHarness>; + + /// Returns two different committee caches for testing. + fn committee_caches() -> (Arc, Arc) { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .default_spec() + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .build(); + let (mut state, _) = harness.get_current_state_and_root(); + state + .build_committee_cache(RelativeEpoch::Current, &harness.chain.spec) + .unwrap(); + state + .build_committee_cache(RelativeEpoch::Next, &harness.chain.spec) + .unwrap(); + let committee_a = state + .committee_cache(RelativeEpoch::Current) + .unwrap() + .clone(); + let committee_b = state.committee_cache(RelativeEpoch::Next).unwrap().clone(); + assert!(committee_a != committee_b); + (Arc::new(committee_a), Arc::new(committee_b)) + } + + /// Builds a deterministic but incoherent shuffling ID from a `u64`. + fn shuffling_id(id: u64) -> AttestationShufflingId { + AttestationShufflingId { + shuffling_epoch: id.into(), + shuffling_decision_block: Hash256::from_low_u64_be(id), + } + } + + #[test] + fn resolved_promise() { + let (committee_a, _) = committee_caches(); + let id_a = shuffling_id(1); + let mut cache = ShufflingCache::new(); + + // Create a promise. + let sender = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve the newly created promise. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "the item should be a promise" + ); + + // Resolve the promise. + sender.send(committee_a.clone()); + + // Ensure the promise has been resolved. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_a), + "the promise should be resolved" + ); + assert_eq!(cache.cache.len(), 1, "the cache should have one entry"); + } + + #[test] + fn unresolved_promise() { + let id_a = shuffling_id(1); + let mut cache = ShufflingCache::new(); + + // Create a promise. + let sender = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve the newly created promise. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "the item should be a promise" + ); + + // Drop the sender without resolving the promise, simulating an error computing the + // committee. + drop(sender); + + // Ensure the key now indicates an empty slot. + assert!(cache.get(&id_a).is_none(), "the slot should be empty"); + assert!(cache.cache.is_empty(), "the cache should be empty"); + } + + #[test] + fn two_promises() { + let (committee_a, committee_b) = committee_caches(); + let (id_a, id_b) = (shuffling_id(1), shuffling_id(2)); + let mut cache = ShufflingCache::new(); + + // Create promise A. + let sender_a = cache.create_promise(id_a.clone()).unwrap(); + + // Retrieve promise A. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "item a should be a promise" + ); + + // Create promise B. + let sender_b = cache.create_promise(id_b.clone()).unwrap(); + + // Retrieve promise B. + let item = cache.get(&id_b).unwrap(); + assert!( + matches!(item, CacheItem::Promise(_)), + "item b should be a promise" + ); + + // Resolve promise A. + sender_a.send(committee_a.clone()); + // Ensure promise A has been resolved. + let item = cache.get(&id_a).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_a), + "promise A should be resolved" + ); + + // Resolve promise B. + sender_b.send(committee_b.clone()); + // Ensure promise B has been resolved. + let item = cache.get(&id_b).unwrap(); + assert!( + matches!(item, CacheItem::Committee(committee) if committee == committee_b), + "promise B should be resolved" + ); + + // Check both entries again. + assert!( + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a), + "promise A should remain resolved" + ); + assert!( + matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b), + "promise B should remain resolved" + ); + assert_eq!(cache.cache.len(), 2, "the cache should have two entries"); + } + + #[test] + fn too_many_promises() { + let mut cache = ShufflingCache::new(); + + for i in 0..MAX_CONCURRENT_PROMISES { + cache.create_promise(shuffling_id(i as u64)).unwrap(); + } + + // Ensure that the next promise returns an error. It is important for the application to + // dump his ass when he can't keep his promises, you're a queen and you deserve better. + assert!(matches!( + cache.create_promise(shuffling_id(MAX_CONCURRENT_PROMISES as u64)), + Err(BeaconChainError::MaxCommitteePromises( + MAX_CONCURRENT_PROMISES + )) + )); + assert_eq!( + cache.cache.len(), + MAX_CONCURRENT_PROMISES, + "the cache should have two entries" + ); + } +} diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 7593557396..70d59c1c2d 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -394,7 +394,7 @@ fn advance_head( .shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(BeaconChainError::AttestationCacheLockTimeout)? - .insert(shuffling_id.clone(), committee_cache); + .insert_committee_cache(shuffling_id.clone(), committee_cache); debug!( log, diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 776faba6c8..d313a86ef4 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -41,28 +41,27 @@ async fn get_chain_segment() -> Vec> { ) .await; - harness + let mut segment = Vec::with_capacity(CHAIN_SEGMENT_LENGTH); + for snapshot in harness .chain .chain_dump() .expect("should dump chain") .into_iter() - .map(|snapshot| { - let full_block = harness - .chain - .store - .make_full_block( - &snapshot.beacon_block_root, - snapshot.beacon_block.as_ref().clone(), - ) - .unwrap(); - BeaconSnapshot { - beacon_block_root: snapshot.beacon_block_root, - beacon_block: Arc::new(full_block), - beacon_state: snapshot.beacon_state, - } - }) .skip(1) - .collect() + { + let full_block = harness + .chain + .get_block(&snapshot.beacon_block_root) + .await + .unwrap() + .unwrap(); + segment.push(BeaconSnapshot { + beacon_block_root: snapshot.beacon_block_root, + beacon_block: Arc::new(full_block), + beacon_state: snapshot.beacon_state, + }); + } + segment } fn get_harness(validator_count: usize) -> BeaconChainHarness> { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index da571e43c3..38de6a0feb 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2057,14 +2057,16 @@ async fn weak_subjectivity_sync() { assert_eq!(new_blocks[0].beacon_block.slot(), wss_slot + 1); for snapshot in new_blocks { - let block = &snapshot.beacon_block; let full_block = harness .chain - .store - .make_full_block(&snapshot.beacon_block_root, block.as_ref().clone()) + .get_block(&snapshot.beacon_block_root) + .await + .unwrap() .unwrap(); + let slot = full_block.slot(); + let state_root = full_block.state_root(); - beacon_chain.slot_clock.set_slot(block.slot().as_u64()); + beacon_chain.slot_clock.set_slot(slot.as_u64()); beacon_chain .process_block(Arc::new(full_block), CountUnrealized::True) .await @@ -2072,10 +2074,9 @@ async fn weak_subjectivity_sync() { beacon_chain.recompute_head_at_current_slot().await; // Check that the new block's state can be loaded correctly. - let state_root = block.state_root(); let mut state = beacon_chain .store - .get_state(&state_root, Some(block.slot())) + .get_state(&state_root, Some(slot)) .unwrap() .unwrap(); assert_eq!(state.update_tree_hash_cache().unwrap(), state_root); @@ -2526,6 +2527,7 @@ fn check_split_slot(harness: &TestHarness, store: Arc, L /// Check that all the states in a chain dump have the correct tree hash. fn check_chain_dump(harness: &TestHarness, expected_len: u64) { let mut chain_dump = harness.chain.chain_dump().unwrap(); + let split_slot = harness.chain.store.get_split_slot(); assert_eq!(chain_dump.len() as u64, expected_len); @@ -2549,6 +2551,21 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { .slot(), checkpoint.beacon_state.slot() ); + + // Check presence of execution payload on disk. + if harness.chain.spec.bellatrix_fork_epoch.is_some() { + assert_eq!( + harness + .chain + .store + .execution_payload_exists(&checkpoint.beacon_block_root) + .unwrap(), + checkpoint.beacon_block.slot() >= split_slot, + "incorrect payload storage for block at slot {}: {:?}", + checkpoint.beacon_block.slot(), + checkpoint.beacon_block_root, + ); + } } // Check the forwards block roots iterator against the chain dump diff --git a/beacon_node/http_api/src/block_rewards.rs b/beacon_node/http_api/src/block_rewards.rs index 3b81b894db..05886a4d02 100644 --- a/beacon_node/http_api/src/block_rewards.rs +++ b/beacon_node/http_api/src/block_rewards.rs @@ -4,7 +4,7 @@ use lru::LruCache; use slog::{debug, warn, Logger}; use state_processing::BlockReplayer; use std::sync::Arc; -use types::BeaconBlock; +use types::BlindedBeaconBlock; use warp_utils::reject::{ beacon_chain_error, beacon_state_error, custom_bad_request, custom_server_error, }; @@ -96,7 +96,7 @@ pub fn get_block_rewards( /// Compute block rewards for blocks passed in as input. pub fn compute_block_rewards( - blocks: Vec>, + blocks: Vec>, chain: Arc>, log: Logger, ) -> Result, warp::Rejection> { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 0cb9c056bf..ec81e7037f 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -25,7 +25,9 @@ use beacon_chain::{ BeaconChainTypes, ProduceBlockVerification, WhenSlotSkipped, }; pub use block_id::BlockId; -use eth2::types::{self as api_types, EndpointVersion, ValidatorId, ValidatorStatus}; +use eth2::types::{ + self as api_types, EndpointVersion, SkipRandaoVerification, ValidatorId, ValidatorStatus, +}; use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; @@ -35,7 +37,6 @@ use slot_clock::SlotClock; use ssz::Encode; pub use state_id::StateId; use std::borrow::Cow; -use std::convert::TryInto; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; @@ -46,7 +47,7 @@ use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, - ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, + ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock, SignedBlindedBeaconBlock, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, @@ -2002,31 +2003,25 @@ pub fn serve( slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc>| async move { - let randao_reveal = query.randao_reveal.as_ref().map_or_else( - || { - if query.verify_randao { - Err(warp_utils::reject::custom_bad_request( - "randao_reveal is mandatory unless verify_randao=false".into(), - )) - } else { - Ok(Signature::empty()) - } - }, - |sig_bytes| { - sig_bytes.try_into().map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "randao reveal is not a valid BLS signature: {:?}", - e - )) - }) - }, - )?; + let randao_reveal = query.randao_reveal.decompress().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "randao reveal is not a valid BLS signature: {:?}", + e + )) + })?; - let randao_verification = if query.verify_randao { - ProduceBlockVerification::VerifyRandao - } else { - ProduceBlockVerification::NoVerification - }; + let randao_verification = + if query.skip_randao_verification == SkipRandaoVerification::Yes { + if !randao_reveal.is_infinity() { + return Err(warp_utils::reject::custom_bad_request( + "randao_reveal must be point-at-infinity if verification is skipped" + .into(), + )); + } + ProduceBlockVerification::NoVerification + } else { + ProduceBlockVerification::VerifyRandao + }; let (block, _) = chain .produce_block_with_verification::>( @@ -2064,31 +2059,25 @@ pub fn serve( |slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc>| async move { - let randao_reveal = query.randao_reveal.as_ref().map_or_else( - || { - if query.verify_randao { - Err(warp_utils::reject::custom_bad_request( - "randao_reveal is mandatory unless verify_randao=false".into(), - )) - } else { - Ok(Signature::empty()) - } - }, - |sig_bytes| { - sig_bytes.try_into().map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "randao reveal is not a valid BLS signature: {:?}", - e - )) - }) - }, - )?; + let randao_reveal = query.randao_reveal.decompress().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "randao reveal is not a valid BLS signature: {:?}", + e + )) + })?; - let randao_verification = if query.verify_randao { - ProduceBlockVerification::VerifyRandao - } else { - ProduceBlockVerification::NoVerification - }; + let randao_verification = + if query.skip_randao_verification == SkipRandaoVerification::Yes { + if !randao_reveal.is_infinity() { + return Err(warp_utils::reject::custom_bad_request( + "randao_reveal must be point-at-infinity if verification is skipped" + .into() + )); + } + ProduceBlockVerification::NoVerification + } else { + ProduceBlockVerification::VerifyRandao + }; let (block, _) = chain .produce_block_with_verification::>( @@ -2103,6 +2092,7 @@ pub fn serve( .to_ref() .fork_name(&chain.spec) .map_err(inconsistent_fork_rejection)?; + // Pose as a V2 endpoint so we return the fork `version`. fork_versioned_response(V2, fork_name, block) .map(|response| warp::reply::json(&response)) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index ca240e64d2..ff664d6ff0 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1939,11 +1939,11 @@ impl ApiTester { let block = self .client - .get_validator_blocks_with_verify_randao::>( + .get_validator_blocks_modular::>( slot, + &Signature::infinity().unwrap().into(), None, - None, - Some(false), + SkipRandaoVerification::Yes, ) .await .unwrap() @@ -1993,45 +1993,23 @@ impl ApiTester { sk.sign(message).into() }; - // Check failure with no `verify_randao` passed. + // Check failure with no `skip_randao_verification` passed. self.client .get_validator_blocks::>(slot, &bad_randao_reveal, None) .await .unwrap_err(); - // Check failure with `verify_randao=true`. + // Check failure with `skip_randao_verification` (requires infinity sig). self.client - .get_validator_blocks_with_verify_randao::>( + .get_validator_blocks_modular::>( slot, - Some(&bad_randao_reveal), + &bad_randao_reveal, None, - Some(true), + SkipRandaoVerification::Yes, ) .await .unwrap_err(); - // Check failure with no randao reveal provided. - self.client - .get_validator_blocks_with_verify_randao::>( - slot, None, None, None, - ) - .await - .unwrap_err(); - - // Check success with `verify_randao=false`. - let block = self - .client - .get_validator_blocks_with_verify_randao::>( - slot, - Some(&bad_randao_reveal), - None, - Some(false), - ) - .await - .unwrap() - .data; - - assert_eq!(block.slot(), slot); self.chain.slot_clock.set_slot(slot.as_u64() + 1); } @@ -2106,11 +2084,11 @@ impl ApiTester { let block = self .client - .get_validator_blinded_blocks_with_verify_randao::( + .get_validator_blinded_blocks_modular::( slot, + &Signature::infinity().unwrap().into(), None, - None, - Some(false), + SkipRandaoVerification::Yes, ) .await .unwrap() @@ -2162,45 +2140,23 @@ impl ApiTester { sk.sign(message).into() }; - // Check failure with no `verify_randao` passed. + // Check failure with full randao verification enabled. self.client .get_validator_blinded_blocks::(slot, &bad_randao_reveal, None) .await .unwrap_err(); - // Check failure with `verify_randao=true`. + // Check failure with `skip_randao_verification` (requires infinity sig). self.client - .get_validator_blinded_blocks_with_verify_randao::( + .get_validator_blinded_blocks_modular::( slot, - Some(&bad_randao_reveal), + &bad_randao_reveal, None, - Some(true), + SkipRandaoVerification::Yes, ) .await .unwrap_err(); - // Check failure with no randao reveal provided. - self.client - .get_validator_blinded_blocks_with_verify_randao::( - slot, None, None, None, - ) - .await - .unwrap_err(); - - // Check success with `verify_randao=false`. - let block = self - .client - .get_validator_blinded_blocks_with_verify_randao::( - slot, - Some(&bad_randao_reveal), - None, - Some(false), - ) - .await - .unwrap() - .data; - - assert_eq!(block.slot(), slot); self.chain.slot_clock.set_slot(slot.as_u64() + 1); } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 93ed1b463b..307b569a91 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1752,6 +1752,19 @@ impl Worker { debug!(self.log, "Attestation for finalized state"; "peer_id" => % peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } + e @ AttnError::BeaconChainError(BeaconChainError::MaxCommitteePromises(_)) => { + debug!( + self.log, + "Dropping attestation"; + "target_root" => ?failed_att.attestation().data.target.root, + "beacon_block_root" => ?beacon_block_root, + "slot" => ?failed_att.attestation().data.slot, + "type" => ?attestation_type, + "error" => ?e, + "peer_id" => % peer_id + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } AttnError::BeaconChainError(e) => { /* * Lighthouse hit an unexpected error whilst processing the attestation. It @@ -1762,7 +1775,10 @@ impl Worker { */ error!( self.log, - "Unable to validate aggregate"; + "Unable to validate attestation"; + "beacon_block_root" => ?beacon_block_root, + "slot" => ?failed_att.attestation().data.slot, + "type" => ?attestation_type, "peer_id" => %peer_id, "error" => ?e, ); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 5691e9a979..5399da258a 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -530,6 +530,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { [default: 1]. You may change the compression level freely without re-syncing.") .takes_value(true) ) + .arg( + Arg::with_name("prune-payloads") + .long("prune-payloads") + .help("Prune execution payloads from Lighthouse's database. This saves space but \ + imposes load on the execution client, as payloads need to be \ + reconstructed and sent to syncing peers.") + .takes_value(true) + .default_value("true") + ) + /* * Misc. */ diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 40ef850eda..4cd34aec7a 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -362,6 +362,10 @@ pub fn get_config( .map_err(|_| "auto-compact-db takes a boolean".to_string())?; } + if let Some(prune_payloads) = clap_utils::parse_optional(cli_args, "prune-payloads")? { + client_config.store.prune_payloads = prune_payloads; + } + /* * Zero-ports * diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 63419d89a3..1faec2e60b 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -28,6 +28,8 @@ pub struct StoreConfig { pub compact_on_init: bool, /// Whether to compact the database during database pruning. pub compact_on_prune: bool, + /// Whether to prune payloads on initialization and finalization. + pub prune_payloads: bool, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -53,6 +55,7 @@ impl Default for StoreConfig { compression_level: DEFAULT_COMPRESSION_LEVEL, compact_on_init: false, compact_on_prune: true, + prune_payloads: true, } } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 59452653a0..b12f2173d3 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -8,7 +8,7 @@ use crate::config::{ use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; use crate::hot_state_iter::HotStateRootIter; use crate::impls::beacon_state::{get_full_state, store_full_state}; -use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; +use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::leveldb_store::{BytesKey, LevelDB}; use crate::memory_store::MemoryStore; use crate::metadata::{ @@ -22,6 +22,7 @@ use crate::{ get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, }; +use itertools::process_results; use leveldb::iterator::LevelDBIterator; use lru::LruCache; use milhouse::Diff; @@ -363,8 +364,10 @@ impl, Cold: ItemStore> HotColdDB }; // If the block is after the split point then we should have the full execution payload - // stored in the database. Otherwise, just return the blinded block. - // Hold the split lock so that it can't change. + // stored in the database. If it isn't but payload pruning is disabled, try to load it + // on-demand. + // + // Hold the split lock so that it can't change while loading the payload. let split = self.split.read_recursive(); let block = if blinded_block.message().execution_payload().is_err() @@ -377,6 +380,18 @@ impl, Cold: ItemStore> HotColdDB self.block_cache.lock().put(*block_root, full_block.clone()); DatabaseBlock::Full(full_block) + } else if !self.config.prune_payloads { + // If payload pruning is disabled there's a chance we may have the payload of + // this finalized block. Attempt to load it but don't error in case it's missing. + if let Some(payload) = self.get_execution_payload(block_root)? { + DatabaseBlock::Full( + blinded_block + .try_into_full_block(Some(payload)) + .ok_or(Error::AddPayloadLogicError)?, + ) + } else { + DatabaseBlock::Blinded(blinded_block) + } } else { DatabaseBlock::Blinded(blinded_block) }; @@ -417,7 +432,9 @@ impl, Cold: ItemStore> HotColdDB blinded_block: SignedBeaconBlock>, ) -> Result, Error> { if blinded_block.message().execution_payload().is_ok() { - let execution_payload = self.get_execution_payload(block_root)?; + let execution_payload = self + .get_execution_payload(block_root)? + .ok_or(HotColdDBError::MissingExecutionPayload(*block_root))?; blinded_block.try_into_full_block(Some(execution_payload)) } else { blinded_block.try_into_full_block(None) @@ -462,9 +479,14 @@ impl, Cold: ItemStore> HotColdDB pub fn get_execution_payload( &self, block_root: &Hash256, - ) -> Result, Error> { - self.get_item(block_root)? - .ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into()) + ) -> Result>, Error> { + self.get_item(block_root) + } + + /// Check if the execution payload for a block exists on disk. + pub fn execution_payload_exists(&self, block_root: &Hash256) -> Result { + self.get_item::>(block_root) + .map(|payload| payload.is_some()) } /// Determine whether a block exists in the database. @@ -1642,6 +1664,113 @@ impl, Cold: ItemStore> HotColdDB &CompactionTimestamp(compaction_timestamp.as_secs()), ) } + + /// Try to prune all execution payloads, returning early if there is no need to prune. + pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> { + let split = self.get_split_info(); + + if split.slot == 0 { + return Ok(()); + } + + let bellatrix_fork_slot = if let Some(epoch) = self.spec.bellatrix_fork_epoch { + epoch.start_slot(E::slots_per_epoch()) + } else { + return Ok(()); + }; + + // Load the split state so we can backtrack to find execution payloads. + let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( + HotColdDBError::MissingSplitState(split.state_root, split.slot), + )?; + + // The finalized block may or may not have its execution payload stored, depending on + // whether it was at a skipped slot. However for a fully pruned database its parent + // should *always* have been pruned. In case of a long split (no parent found) we + // continue as if the payloads are pruned, as the node probably has other things to worry + // about. + let split_block_root = split_state.get_latest_block_root(split.state_root); + + let already_pruned = + process_results(split_state.rev_iter_block_roots(&self.spec), |mut iter| { + iter.find(|(_, block_root)| *block_root != split_block_root) + .map_or(Ok(true), |(_, split_parent_root)| { + self.execution_payload_exists(&split_parent_root) + .map(|exists| !exists) + }) + })??; + + if already_pruned && !force { + info!(self.log, "Execution payloads are pruned"); + return Ok(()); + } + + // Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes + // first. + warn!( + self.log, + "Pruning finalized payloads"; + "info" => "you may notice degraded I/O performance while this runs" + ); + let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot); + + let mut ops = vec![]; + let mut last_pruned_block_root = None; + + for res in std::iter::once(Ok((split_block_root, split.slot))) + .chain(BlockRootsIterator::new(self, &split_state)) + { + let (block_root, slot) = match res { + Ok(tuple) => tuple, + Err(e) => { + warn!( + self.log, + "Stopping payload pruning early"; + "error" => ?e, + ); + break; + } + }; + + if slot < bellatrix_fork_slot { + info!( + self.log, + "Payload pruning reached Bellatrix boundary"; + ); + break; + } + + if Some(block_root) != last_pruned_block_root + && self.execution_payload_exists(&block_root)? + { + debug!( + self.log, + "Pruning execution payload"; + "slot" => slot, + "block_root" => ?block_root, + ); + last_pruned_block_root = Some(block_root); + ops.push(StoreOp::DeleteExecutionPayload(block_root)); + } + + if Some(slot) == anchor_slot { + info!( + self.log, + "Payload pruning reached anchor state"; + "slot" => slot + ); + break; + } + } + let payloads_pruned = ops.len(); + self.do_atomically(ops)?; + info!( + self.log, + "Execution payload pruning complete"; + "payloads_pruned" => payloads_pruned, + ); + Ok(()) + } } /// Advance the split point of the store, moving new finalized states to the freezer. @@ -1687,15 +1816,15 @@ pub fn migrate_database, Cold: ItemStore>( // the cold DB. let mut hot_db_ops: Vec> = Vec::new(); - let state_root_iter = StateRootsIterator::new(&store, finalized_state); - for maybe_pair in state_root_iter.take_while(|result| match result { - Ok((_, slot)) => { + let state_root_iter = RootsIterator::new(&store, finalized_state); + for maybe_tuple in state_root_iter.take_while(|result| match result { + Ok((_, _, slot)) => { slot >= ¤t_split_slot && anchor_slot.map_or(true, |anchor_slot| slot >= &anchor_slot) } Err(_) => true, }) { - let (state_root, slot) = maybe_pair?; + let (block_root, state_root, slot) = maybe_tuple?; let mut cold_db_ops: Vec = Vec::new(); @@ -1719,6 +1848,14 @@ pub fn migrate_database, Cold: ItemStore>( // Delete the old summary, and the full state if we lie on an epoch boundary. hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); + + // Delete the execution payload if payload pruning is enabled. At a skipped slot we may + // delete the payload for the finalized block itself, but that's OK as we only guarantee + // that payloads are present for slots >= the split slot. The payload fetching code is also + // forgiving of missing payloads. + if store.config.prune_payloads { + hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); + } } // Warning: Critical section. We have to take care not to put any of the two databases in an diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index f096aca97e..104ca9ccd4 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1233,17 +1233,17 @@ impl BeaconNodeHttpClient { randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, ) -> Result>, Error> { - self.get_validator_blocks_with_verify_randao(slot, Some(randao_reveal), graffiti, None) + self.get_validator_blocks_modular(slot, randao_reveal, graffiti, SkipRandaoVerification::No) .await } /// `GET v2/validator/blocks/{slot}` - pub async fn get_validator_blocks_with_verify_randao>( + pub async fn get_validator_blocks_modular>( &self, slot: Slot, - randao_reveal: Option<&SignatureBytes>, + randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, - verify_randao: Option, + skip_randao_verification: SkipRandaoVerification, ) -> Result>, Error> { let mut path = self.eth_path(V2)?; @@ -1253,19 +1253,17 @@ impl BeaconNodeHttpClient { .push("blocks") .push(&slot.to_string()); - if let Some(randao_reveal) = randao_reveal { - path.query_pairs_mut() - .append_pair("randao_reveal", &randao_reveal.to_string()); - } + path.query_pairs_mut() + .append_pair("randao_reveal", &randao_reveal.to_string()); if let Some(graffiti) = graffiti { path.query_pairs_mut() .append_pair("graffiti", &graffiti.to_string()); } - if let Some(verify_randao) = verify_randao { + if skip_randao_verification == SkipRandaoVerification::Yes { path.query_pairs_mut() - .append_pair("verify_randao", &verify_randao.to_string()); + .append_pair("skip_randao_verification", ""); } self.get(path).await @@ -1278,25 +1276,22 @@ impl BeaconNodeHttpClient { randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, ) -> Result>, Error> { - self.get_validator_blinded_blocks_with_verify_randao( + self.get_validator_blinded_blocks_modular( slot, - Some(randao_reveal), + randao_reveal, graffiti, - None, + SkipRandaoVerification::No, ) .await } /// `GET v1/validator/blinded_blocks/{slot}` - pub async fn get_validator_blinded_blocks_with_verify_randao< - T: EthSpec, - Payload: ExecPayload, - >( + pub async fn get_validator_blinded_blocks_modular>( &self, slot: Slot, - randao_reveal: Option<&SignatureBytes>, + randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, - verify_randao: Option, + skip_randao_verification: SkipRandaoVerification, ) -> Result>, Error> { let mut path = self.eth_path(V1)?; @@ -1306,19 +1301,17 @@ impl BeaconNodeHttpClient { .push("blinded_blocks") .push(&slot.to_string()); - if let Some(randao_reveal) = randao_reveal { - path.query_pairs_mut() - .append_pair("randao_reveal", &randao_reveal.to_string()); - } + path.query_pairs_mut() + .append_pair("randao_reveal", &randao_reveal.to_string()); if let Some(graffiti) = graffiti { path.query_pairs_mut() .append_pair("graffiti", &graffiti.to_string()); } - if let Some(verify_randao) = verify_randao { + if skip_randao_verification == SkipRandaoVerification::Yes { path.query_pairs_mut() - .append_pair("verify_randao", &verify_randao.to_string()); + .append_key_only("skip_randao_verification"); } self.get(path).await diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 9a256f5ade..878af57a1f 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -658,16 +658,34 @@ pub struct ProposerData { pub slot: Slot, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Deserialize)] pub struct ValidatorBlocksQuery { - pub randao_reveal: Option, + pub randao_reveal: SignatureBytes, pub graffiti: Option, - #[serde(default = "default_verify_randao")] - pub verify_randao: bool, + pub skip_randao_verification: SkipRandaoVerification, } -fn default_verify_randao() -> bool { - true +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)] +#[serde(try_from = "Option")] +pub enum SkipRandaoVerification { + Yes, + #[default] + No, +} + +/// Parse a `skip_randao_verification` query parameter. +impl TryFrom> for SkipRandaoVerification { + type Error = String; + + fn try_from(opt: Option) -> Result { + match opt.as_deref() { + None => Ok(SkipRandaoVerification::No), + Some("") => Ok(SkipRandaoVerification::Yes), + Some(s) => Err(format!( + "skip_randao_verification does not take a value, got: {s}" + )), + } + } } #[derive(Clone, Serialize, Deserialize)] diff --git a/common/oneshot_broadcast/Cargo.toml b/common/oneshot_broadcast/Cargo.toml new file mode 100644 index 0000000000..baefe10661 --- /dev/null +++ b/common/oneshot_broadcast/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "oneshot_broadcast" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +parking_lot = "0.12.0" diff --git a/common/oneshot_broadcast/src/lib.rs b/common/oneshot_broadcast/src/lib.rs new file mode 100644 index 0000000000..2c616b3bb3 --- /dev/null +++ b/common/oneshot_broadcast/src/lib.rs @@ -0,0 +1,190 @@ +//! Provides a single-sender, multiple receiver one-shot channel where any message sent will be +//! received by all senders. +//! +//! This implementation may not be blazingly fast but it should be simple enough to be reliable. +use parking_lot::{Condvar, Mutex}; +use std::sync::Arc; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum Error { + SenderDropped, +} + +enum Future { + /// The future is ready and the item may be consumed. + Ready(T), + /// Future is not ready. + NotReady, + /// The sender has been dropped without sending a message. + SenderDropped, +} + +struct MutexCondvar { + mutex: Mutex>, + condvar: Condvar, +} + +/// The sending pair of the `oneshot` channel. +pub struct Sender(Arc>); + +impl Sender { + /// Send a message, consuming `self` and delivering the message to *all* receivers. + pub fn send(self, item: T) { + *self.0.mutex.lock() = Future::Ready(item); + // Condvar notification will be handled by the `Drop` implementation. + } +} + +impl Drop for Sender { + /// Flag the sender as dropped and notify all receivers. + fn drop(&mut self) { + let mut lock = self.0.mutex.lock(); + if !matches!(*lock, Future::Ready(_)) { + *lock = Future::SenderDropped + } + self.0.condvar.notify_all(); + // The lock must be held whilst the condvar is notified. + drop(lock); + } +} + +/// The receiving pair of the `oneshot` channel. Always receives the message sent by the `Sender` +/// (if any). +#[derive(Clone)] +pub struct Receiver(Arc>); + +impl Receiver { + /// Check to see if there is a message to be read *without* blocking/waiting. + /// + /// ## Note + /// + /// This method will technically perform *some* blocking to access a `Mutex`. It is non-blocking + /// in the sense that it won't block until a message is received (i.e., it may return `Ok(None)` + /// if no message has been sent yet). + pub fn try_recv(&self) -> Result, Error> { + match &*self.0.mutex.lock() { + Future::Ready(item) => Ok(Some(item.clone())), + Future::NotReady => Ok(None), + Future::SenderDropped => Err(Error::SenderDropped), + } + } + + /// Check to see if there is a message to be read whilst blocking/waiting until a message is + /// sent or the `Sender` is dropped. + pub fn recv(self) -> Result { + let mut lock = self.0.mutex.lock(); + loop { + match &*lock { + Future::Ready(item) => return Ok(item.clone()), + Future::NotReady => self.0.condvar.wait(&mut lock), + Future::SenderDropped => return Err(Error::SenderDropped), + } + } + } +} + +/// A single-sender, multiple-receiver broadcast channel. +/// +/// The sender may send *only one* message which will be received by *all* receivers. +pub fn oneshot() -> (Sender, Receiver) { + let mutex_condvar = Arc::new(MutexCondvar { + mutex: Mutex::new(Future::NotReady), + condvar: Condvar::new(), + }); + let receiver = Receiver(mutex_condvar.clone()); + let sender = Sender(mutex_condvar); + (sender, receiver) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + use std::time::Duration; + + #[test] + fn single_thread_try_recv() { + let (sender, receiver) = oneshot(); + assert_eq!(receiver.try_recv(), Ok(None)); + sender.send(42); + assert_eq!(receiver.try_recv(), Ok(Some(42))); + } + + #[test] + fn single_thread_try_recv_no_message() { + let (sender, receiver) = oneshot::(); + assert_eq!(receiver.try_recv(), Ok(None)); + drop(sender); + assert_eq!(receiver.try_recv(), Err(Error::SenderDropped)); + } + + #[test] + fn single_thread_recv() { + let (sender, receiver) = oneshot(); + assert_eq!(receiver.try_recv(), Ok(None)); + sender.send(42); + assert_eq!(receiver.recv(), Ok(42)); + } + + #[test] + fn single_thread_recv_no_message() { + let (sender, receiver) = oneshot::(); + assert_eq!(receiver.try_recv(), Ok(None)); + drop(sender); + assert_eq!(receiver.recv(), Err(Error::SenderDropped)); + } + + #[test] + fn two_threads_message_sent() { + let (sender, receiver) = oneshot(); + + let handle = thread::spawn(|| receiver.recv().unwrap()); + + sender.send(42); + assert_eq!(handle.join().unwrap(), 42); + } + + #[test] + fn three_threads_message_set() { + let (sender, receiver) = oneshot(); + + let receiver_a = receiver.clone(); + let handle_a = thread::spawn(|| receiver_a.recv().unwrap()); + let handle_b = thread::spawn(|| receiver.recv().unwrap()); + + sender.send(42); + assert_eq!(handle_a.join().unwrap(), 42); + assert_eq!(handle_b.join().unwrap(), 42); + } + + #[test] + fn three_threads_sender_dropped() { + let (sender, receiver) = oneshot::(); + + let receiver_a = receiver.clone(); + let handle_a = thread::spawn(|| receiver_a.recv()); + let handle_b = thread::spawn(|| receiver.recv()); + + drop(sender); + assert_eq!(handle_a.join().unwrap(), Err(Error::SenderDropped)); + assert_eq!(handle_b.join().unwrap(), Err(Error::SenderDropped)); + } + + #[test] + fn sender_dropped_after_recv() { + let (sender_a, receiver_a) = oneshot(); + let (sender_b, receiver_b) = oneshot::(); + + let handle_0 = thread::spawn(|| { + sender_a.send(1); + receiver_b.recv() + }); + + assert_eq!(receiver_a.recv(), Ok(1)); + // This is a slightly hacky sleep that assumes that the thread has had enough time after + // sending down `sender_a` to start listening to `receiver_b`. + thread::sleep(Duration::from_secs(1)); + drop(sender_b); + assert_eq!(handle_0.join().unwrap(), Err(Error::SenderDropped)) + } +} diff --git a/consensus/ssz/src/decode/try_from_iter.rs b/consensus/ssz/src/decode/try_from_iter.rs index 22db02d4fc..1ff89a107f 100644 --- a/consensus/ssz/src/decode/try_from_iter.rs +++ b/consensus/ssz/src/decode/try_from_iter.rs @@ -29,11 +29,18 @@ pub trait TryFromIter: Sized { impl TryFromIter for Vec { type Error = Infallible; - fn try_from_iter(iter: I) -> Result + fn try_from_iter(values: I) -> Result where I: IntoIterator, { - Ok(Self::from_iter(iter)) + // Pre-allocate the expected size of the Vec, which is parsed from the SSZ input bytes as + // `num_items`. This length has already been checked to be less than or equal to the type's + // maximum length in `decode_list_of_variable_length_items`. + let iter = values.into_iter(); + let (_, opt_max_len) = iter.size_hint(); + let mut vec = Vec::with_capacity(opt_max_len.unwrap_or(0)); + vec.extend(iter); + Ok(vec) } } diff --git a/consensus/tree_hash/src/impls.rs b/consensus/tree_hash/src/impls.rs index 2c7ade30d0..cf05d2a3d5 100644 --- a/consensus/tree_hash/src/impls.rs +++ b/consensus/tree_hash/src/impls.rs @@ -1,6 +1,5 @@ use super::*; use ethereum_types::{H160, H256, U128, U256}; -use smallvec::{smallvec, ToSmallVec}; fn int_to_hash256(int: u64) -> Hash256 { let mut bytes = [0; HASHSIZE]; @@ -16,7 +15,7 @@ macro_rules! impl_for_bitsize { } fn tree_hash_packed_encoding(&self) -> PackedEncoding { - self.to_le_bytes().to_smallvec() + PackedEncoding::from_slice(&self.to_le_bytes()) } fn tree_hash_packing_factor() -> usize { @@ -89,9 +88,9 @@ impl TreeHash for U128 { } fn tree_hash_packed_encoding(&self) -> PackedEncoding { - let mut result = smallvec![0; 16]; + let mut result = [0; 16]; self.to_little_endian(&mut result); - result + PackedEncoding::from_slice(&result) } fn tree_hash_packing_factor() -> usize { @@ -111,9 +110,9 @@ impl TreeHash for U256 { } fn tree_hash_packed_encoding(&self) -> PackedEncoding { - let mut result = smallvec![0; 32]; + let mut result = [0; 32]; self.to_little_endian(&mut result); - result + PackedEncoding::from_slice(&result) } fn tree_hash_packing_factor() -> usize { @@ -133,9 +132,9 @@ impl TreeHash for H160 { } fn tree_hash_packed_encoding(&self) -> PackedEncoding { - let mut result = smallvec![0; 32]; + let mut result = [0; 32]; result[0..20].copy_from_slice(self.as_bytes()); - result + PackedEncoding::from_slice(&result) } fn tree_hash_packing_factor() -> usize { @@ -155,7 +154,7 @@ impl TreeHash for H256 { } fn tree_hash_packed_encoding(&self) -> PackedEncoding { - self.as_bytes().to_smallvec() + PackedEncoding::from_slice(self.as_bytes()) } fn tree_hash_packing_factor() -> usize { diff --git a/consensus/tree_hash/src/lib.rs b/consensus/tree_hash/src/lib.rs index d6ffe7b072..6e8e9657e0 100644 --- a/consensus/tree_hash/src/lib.rs +++ b/consensus/tree_hash/src/lib.rs @@ -14,9 +14,10 @@ pub const BYTES_PER_CHUNK: usize = 32; pub const HASHSIZE: usize = 32; pub const MERKLE_HASH_CHUNK: usize = 2 * BYTES_PER_CHUNK; pub const MAX_UNION_SELECTOR: u8 = 127; +pub const SMALLVEC_SIZE: usize = 32; pub type Hash256 = ethereum_types::H256; -pub type PackedEncoding = SmallVec<[u8; BYTES_PER_CHUNK]>; +pub type PackedEncoding = SmallVec<[u8; SMALLVEC_SIZE]>; /// Convenience method for `MerkleHasher` which also provides some fast-paths for small trees. /// diff --git a/consensus/tree_hash/tests/tests.rs b/consensus/tree_hash/tests/tests.rs index 7ad033766c..8b2a4b21be 100644 --- a/consensus/tree_hash/tests/tests.rs +++ b/consensus/tree_hash/tests/tests.rs @@ -1,5 +1,5 @@ use ssz_derive::Encode; -use tree_hash::{Hash256, MerkleHasher, TreeHash, BYTES_PER_CHUNK}; +use tree_hash::{Hash256, MerkleHasher, PackedEncoding, TreeHash, BYTES_PER_CHUNK}; use tree_hash_derive::TreeHash; #[derive(Encode)] @@ -18,7 +18,7 @@ impl tree_hash::TreeHash for HashVec { tree_hash::TreeHashType::List } - fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { unreachable!("List should never be packed.") } diff --git a/consensus/types/src/beacon_block.rs b/consensus/types/src/beacon_block.rs index 70cf0812d4..83befb2e25 100644 --- a/consensus/types/src/beacon_block.rs +++ b/consensus/types/src/beacon_block.rs @@ -66,6 +66,8 @@ pub struct BeaconBlock = FullPayload> { pub body: BeaconBlockBodyMerge, } +pub type BlindedBeaconBlock = BeaconBlock>; + impl> SignedRoot for BeaconBlock {} impl<'a, T: EthSpec, Payload: ExecPayload> SignedRoot for BeaconBlockRef<'a, T, Payload> {} diff --git a/consensus/types/src/graffiti.rs b/consensus/types/src/graffiti.rs index 91e41600fc..73beb82649 100644 --- a/consensus/types/src/graffiti.rs +++ b/consensus/types/src/graffiti.rs @@ -7,7 +7,7 @@ use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use ssz::{Decode, DecodeError, Encode}; use std::fmt; use std::str::FromStr; -use tree_hash::TreeHash; +use tree_hash::{PackedEncoding, TreeHash}; pub const GRAFFITI_BYTES_LEN: usize = 32; @@ -159,7 +159,7 @@ impl TreeHash for Graffiti { <[u8; GRAFFITI_BYTES_LEN]>::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { self.0.tree_hash_packed_encoding() } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 13f1fbd621..f10c8b4fd6 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -98,7 +98,7 @@ pub use crate::attestation_duty::AttestationDuty; pub use crate::attester_slashing::AttesterSlashing; pub use crate::beacon_block::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, BeaconBlockRef, - BeaconBlockRefMut, + BeaconBlockRefMut, BlindedBeaconBlock, }; pub use crate::beacon_block_body::{ BeaconBlockBody, BeaconBlockBodyAltair, BeaconBlockBodyBase, BeaconBlockBodyMerge, diff --git a/consensus/types/src/participation_flags.rs b/consensus/types/src/participation_flags.rs index 80835521b6..a2dd494864 100644 --- a/consensus/types/src/participation_flags.rs +++ b/consensus/types/src/participation_flags.rs @@ -3,7 +3,7 @@ use safe_arith::{ArithError, SafeArith}; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, DecodeError, Encode}; use test_random_derive::TestRandom; -use tree_hash::{TreeHash, TreeHashType}; +use tree_hash::{PackedEncoding, TreeHash, TreeHashType}; #[derive(Debug, Default, Clone, Copy, PartialEq, Deserialize, Serialize, TestRandom)] #[serde(transparent)] @@ -78,7 +78,7 @@ impl TreeHash for ParticipationFlags { u8::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { self.bits.tree_hash_packed_encoding() } diff --git a/consensus/types/src/payload.rs b/consensus/types/src/payload.rs index 0a0432e7eb..667fff58c7 100644 --- a/consensus/types/src/payload.rs +++ b/consensus/types/src/payload.rs @@ -7,7 +7,7 @@ use std::convert::TryFrom; use std::fmt::Debug; use std::hash::Hash; use test_random_derive::TestRandom; -use tree_hash::TreeHash; +use tree_hash::{PackedEncoding, TreeHash}; #[derive(Debug)] pub enum BlockType { @@ -175,7 +175,7 @@ impl TreeHash for BlindedPayload { >::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { + fn tree_hash_packed_encoding(&self) -> PackedEncoding { self.execution_payload_header.tree_hash_packed_encoding() } diff --git a/crypto/bls/src/generic_signature.rs b/crypto/bls/src/generic_signature.rs index 10ef75fc68..01e5ed1d48 100644 --- a/crypto/bls/src/generic_signature.rs +++ b/crypto/bls/src/generic_signature.rs @@ -80,6 +80,18 @@ where self.point.is_none() } + /// Initialize self to the point-at-infinity. + /// + /// In general `AggregateSignature::infinity` should be used in favour of this function. + pub fn infinity() -> Result { + Self::deserialize(&INFINITY_SIGNATURE) + } + + /// Returns `true` if `self` is equal to the point at infinity. + pub fn is_infinity(&self) -> bool { + self.is_infinity + } + /// Returns a reference to the underlying BLS point. pub(crate) fn point(&self) -> Option<&Sig> { self.point.as_ref() diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 50295df4b0..20147adb9f 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -59,6 +59,12 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { ) } +pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { + App::new("prune_payloads") + .setting(clap::AppSettings::ColoredHelp) + .about("Prune finalized execution payloads") +} + pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new(CMD) .visible_aliases(&["db"]) @@ -85,6 +91,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .subcommand(migrate_cli_app()) .subcommand(version_cli_app()) .subcommand(inspect_cli_app()) + .subcommand(prune_payloads_app()) } fn parse_client_config( @@ -257,6 +264,30 @@ pub fn migrate_db( ) } +pub fn prune_payloads( + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log, + )?; + + // If we're trigging a prune manually then ignore the check on the split's parent that bails + // out early. + let force = true; + db.try_prune_execution_payloads(force) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Result<(), String> { let client_config = parse_client_config(cli_args, &env)?; @@ -273,6 +304,7 @@ pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Re let inspect_config = parse_inspect_config(cli_args)?; inspect_db(inspect_config, client_config, &context, log) } + ("prune_payloads", Some(_)) => prune_payloads(client_config, &context, log), _ => { return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()) } diff --git a/lcli/src/block_root.rs b/lcli/src/block_root.rs new file mode 100644 index 0000000000..7631872c5c --- /dev/null +++ b/lcli/src/block_root.rs @@ -0,0 +1,103 @@ +//! # Block Root +//! +//! Use this tool to compute the canonical root of a `SignedBeaconBlock`. This is most likely only +//! useful for benchmarking with tools like `flamegraph`. +//! +//! It can load a block from a SSZ file or download it from a beaconAPI. +//! +//! Logging output is controlled via the `RUST_LOG` environment variable. For example, `export +//! RUST_LOG=debug`. +//! +//! ## Examples +//! +//! Download a block and re-compute the canonical root 5,000 times. +//! +//! ```ignore +//! lcli block-root \ +//! --beacon-url http://localhost:5052 \ +//! --block-id 0x3d887d30ee25c9c1ce7621ec30a7b49b07d6a03200df9c7206faca52a533f432 \ +//! --runs 5000 +//! ``` +//! +//! Load a block from SSZ and compute the canonical root once. +//! +//! ```ignore +//! lcli block-root \ +//! --block-path /tmp/block.ssz \ +//! --runs 1 +//! ``` +use crate::transition_blocks::load_from_ssz_with; +use clap::ArgMatches; +use clap_utils::{parse_optional, parse_required}; +use environment::Environment; +use eth2::{types::BlockId, BeaconNodeHttpClient, SensitiveUrl, Timeouts}; +use std::path::PathBuf; +use std::time::{Duration, Instant}; +use types::{EthSpec, FullPayload, SignedBeaconBlock}; + +const HTTP_TIMEOUT: Duration = Duration::from_secs(5); + +pub fn run(mut env: Environment, matches: &ArgMatches) -> Result<(), String> { + let spec = &T::default_spec(); + let executor = env.core_context().executor; + + /* + * Parse (most) CLI arguments. + */ + + let block_path: Option = parse_optional(matches, "block-path")?; + let beacon_url: Option = parse_optional(matches, "beacon-url")?; + let runs: usize = parse_required(matches, "runs")?; + + info!("Using {} spec", T::spec_name()); + info!("Doing {} runs", runs); + + /* + * Load the block and pre-state from disk or beaconAPI URL. + */ + + let block: SignedBeaconBlock> = match (block_path, beacon_url) { + (Some(block_path), None) => { + info!("Block path: {:?}", block_path); + load_from_ssz_with(&block_path, spec, SignedBeaconBlock::from_ssz_bytes)? + } + (None, Some(beacon_url)) => { + let block_id: BlockId = parse_required(matches, "block-id")?; + let client = BeaconNodeHttpClient::new(beacon_url, Timeouts::set_all(HTTP_TIMEOUT)); + executor + .handle() + .ok_or("shutdown in progress")? + .block_on(async move { + let block = client + .get_beacon_blocks(block_id) + .await + .map_err(|e| format!("Failed to download block: {:?}", e))? + .ok_or_else(|| format!("Unable to locate block at {:?}", block_id))? + .data; + Ok::<_, String>(block) + }) + .map_err(|e| format!("Failed to complete task: {:?}", e))? + } + _ => return Err("must supply --block-path *or* --beacon-url".into()), + }; + + /* + * Perform the core "runs". + */ + + let mut block_root = None; + for i in 0..runs { + let start = Instant::now(); + + block_root = Some(block.canonical_root()); + + let duration = Instant::now().duration_since(start); + info!("Run {}: {:?}", i, duration); + } + + if let Some(block_root) = block_root { + info!("Block root is {:?}", block_root); + } + + Ok(()) +} diff --git a/lcli/src/main.rs b/lcli/src/main.rs index e6a4eeeacb..84b951e3f1 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -1,5 +1,6 @@ #[macro_use] extern crate log; +mod block_root; mod change_genesis_time; mod check_deposit_data; mod create_payload_header; @@ -714,6 +715,42 @@ fn main() { .help("List of Attestations to convert to indexed form (JSON)"), ) ) + .subcommand( + SubCommand::with_name("block-root") + .about("Computes the block root of some block") + .arg( + Arg::with_name("block-path") + .long("block-path") + .value_name("PATH") + .takes_value(true) + .conflicts_with("beacon-url") + .requires("pre-state-path") + .help("Path to load a SignedBeaconBlock from file as SSZ."), + ) + .arg( + Arg::with_name("beacon-url") + .long("beacon-url") + .value_name("URL") + .takes_value(true) + .help("URL to a beacon-API provider."), + ) + .arg( + Arg::with_name("block-id") + .long("block-id") + .value_name("BLOCK_ID") + .takes_value(true) + .requires("beacon-url") + .help("Identifier for a block as per beacon-API standards (slot, root, etc.)"), + ) + .arg( + Arg::with_name("runs") + .long("runs") + .value_name("INTEGER") + .takes_value(true) + .default_value("1") + .help("Number of repeat runs, useful for benchmarking."), + ) + ) .get_matches(); let result = matches @@ -799,6 +836,8 @@ fn run( .map_err(|e| format!("Failed to run insecure-validators command: {}", e)), ("indexed-attestations", Some(matches)) => indexed_attestations::run::(matches) .map_err(|e| format!("Failed to run indexed-attestations command: {}", e)), + ("block-root", Some(matches)) => block_root::run::(env, matches) + .map_err(|e| format!("Failed to run block-root command: {}", e)), (other, _) => Err(format!("Unknown subcommand {}. See --help.", other)), } } diff --git a/lcli/src/transition_blocks.rs b/lcli/src/transition_blocks.rs index f0db165f9a..67a3e675c0 100644 --- a/lcli/src/transition_blocks.rs +++ b/lcli/src/transition_blocks.rs @@ -6,6 +6,9 @@ //! It can load states and blocks from file or pull them from a beaconAPI. Objects pulled from a //! beaconAPI can be saved to disk to reduce future calls to that server. //! +//! Logging output is controlled via the `RUST_LOG` environment variable. For example, `export +//! RUST_LOG=debug`. +//! //! ## Examples //! //! ### Run using a block from a beaconAPI @@ -124,8 +127,8 @@ pub fn run(mut env: Environment, matches: &ArgMatches) -> Result< let (mut pre_state, mut state_root_opt, block) = match (pre_state_path, block_path, beacon_url) { (Some(pre_state_path), Some(block_path), None) => { - info!("Block path: {:?}", pre_state_path); - info!("Pre-state path: {:?}", block_path); + info!("Block path: {:?}", block_path); + info!("Pre-state path: {:?}", pre_state_path); let pre_state = load_from_ssz_with(&pre_state_path, spec, BeaconState::from_ssz_bytes)?; let block = load_from_ssz_with(&block_path, spec, SignedBeaconBlock::from_ssz_bytes)?; (pre_state, None, block) diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index b28c1a0c3e..661bbcdb0c 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1227,6 +1227,19 @@ fn compact_db_flag() { .with_config(|config| assert!(config.store.compact_on_init)); } #[test] +fn prune_payloads_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.store.prune_payloads)); +} +#[test] +fn prune_payloads_on_startup_false() { + CommandLineTest::new() + .flag("prune-payloads", Some("false")) + .run_with_zero_port() + .with_config(|config| assert!(!config.store.prune_payloads)); +} +#[test] fn reconstruct_historic_states_flag() { CommandLineTest::new() .flag("reconstruct-historic-states", None)