Optimize validator duties (#2243)

## Issue Addressed

Closes #2052

## Proposed Changes

- Refactor the attester/proposer duties endpoints in the BN
    - Performance improvements
    - Fixes some potential inconsistencies with the dependent root fields.
    - Removes `http_api::beacon_proposer_cache` and just uses the one on the `BeaconChain` instead.
    - Move the code for the proposer/attester duties endpoints into separate files, for readability.
- Refactor the `DutiesService` in the VC
    - Required to reduce the delay on broadcasting new blocks.
    - Gets rid of the `ValidatorDuty` shim struct that came about when we adopted the standard API.
    - Separate block/attestation duty tasks so that they don't block each other when one is slow.
- In the VC, use `PublicKeyBytes` to represent validators instead of `PublicKey`. `PublicKey` is a legit crypto object whilst `PublicKeyBytes` is just a byte-array, it's much faster to clone/hash `PublicKeyBytes` and this change has had a significant impact on runtimes.
    - Unfortunately this has created lots of dust changes.
 - In the BN, store `PublicKeyBytes` in the `beacon_proposer_cache` and allow access to them. The HTTP API always sends `PublicKeyBytes` over the wire and the conversion from `PublicKey` -> `PublickeyBytes` is non-trivial, especially when queries have 100s/1000s of validators (like Pyrmont).
 - Add the `state_processing::state_advance` mod which dedups a lot of the "apply `n` skip slots to the state" code.
    - This also fixes a bug with some functions which were failing to include a state root as per [this comment](072695284f/consensus/state_processing/src/state_advance.rs (L69-L74)). I couldn't find any instance of this bug that resulted in anything more severe than keying a shuffling cache by the wrong block root.
 - Swap the VC block service to use `mpsc` from `tokio` instead of `futures`. This is consistent with the rest of the code base.
    
~~This PR *reduces* the size of the codebase 🎉~~ It *used* to reduce the size of the code base before I added more comments. 

## Observations on Prymont

- Proposer duties times down from peaks of 450ms to consistent <1ms.
- Current epoch attester duties times down from >1s peaks to a consistent 20-30ms.
- Block production down from +600ms to 100-200ms.

## Additional Info

- ~~Blocked on #2241~~
- ~~Blocked on #2234~~

## TODO

- [x] ~~Refactor this into some smaller PRs?~~ Leaving this as-is for now.
- [x] Address `per_slot_processing` roots.
- [x] Investigate slow next epoch times. Not getting added to cache on block processing?
- [x] Consider [this](072695284f/beacon_node/store/src/hot_cold_store.rs (L811-L812)) in the scenario of replacing the state roots


Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Paul Hauner
2021-03-17 05:09:57 +00:00
parent 6a69b20be1
commit 015ab7d0a7
49 changed files with 2201 additions and 1833 deletions

View File

@@ -5,9 +5,10 @@
//! There are also some additional, non-standard endpoints behind the `/lighthouse/` path which are
//! used for development.
mod beacon_proposer_cache;
mod attester_duties;
mod block_id;
mod metrics;
mod proposer_duties;
mod state_id;
mod validator_inclusion;
@@ -17,19 +18,16 @@ use beacon_chain::{
validator_monitor::{get_block_delay_ms, timestamp_now},
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use beacon_proposer_cache::BeaconProposerCache;
use block_id::BlockId;
use eth2::types::{self as api_types, ValidatorId};
use eth2_libp2p::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_version::version_with_platform;
use network::NetworkMessage;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use state_id::StateId;
use state_processing::per_slot_processing;
use std::borrow::Cow;
use std::convert::TryInto;
use std::future::Future;
@@ -38,9 +36,8 @@ use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec,
Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, Slot, YamlConfig,
Attestation, AttesterSlashing, CommitteeCache, Epoch, EthSpec, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, YamlConfig,
};
use warp::http::StatusCode;
use warp::sse::Event;
@@ -240,30 +237,6 @@ pub fn serve<T: BeaconChainTypes>(
let eth1_v1 = warp::path(API_PREFIX).and(warp::path(API_VERSION));
// Instantiate the beacon proposer cache.
let beacon_proposer_cache = ctx
.chain
.as_ref()
.map(|chain| BeaconProposerCache::new(&chain))
.transpose()
.map_err(|e| format!("Unable to initialize beacon proposer cache: {:?}", e))?
.map(Mutex::new)
.map(Arc::new);
// Create a `warp` filter that provides access to the proposer cache.
let beacon_proposer_cache = || {
warp::any()
.map(move || beacon_proposer_cache.clone())
.and_then(|beacon_proposer_cache| async move {
match beacon_proposer_cache {
Some(cache) => Ok(cache),
None => Err(warp_utils::reject::custom_not_found(
"Beacon proposer cache is not initialized.".to_string(),
)),
}
})
};
// Create a `warp` filter that provides access to the network globals.
let inner_network_globals = ctx.network_globals.clone();
let network_globals = warp::any()
@@ -1674,89 +1647,10 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and(beacon_proposer_cache())
.and_then(
|epoch: Epoch,
chain: Arc<BeaconChain<T>>,
beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>| {
blocking_json_task(move || {
let current_epoch = chain
.epoch()
.map_err(warp_utils::reject::beacon_chain_error)?;
if epoch > current_epoch {
return Err(warp_utils::reject::custom_bad_request(format!(
"request epoch {} is ahead of the current epoch {}",
epoch, current_epoch
)));
}
if epoch == current_epoch {
let dependent_root_slot = current_epoch
.start_slot(T::EthSpec::slots_per_epoch()) - 1;
let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? {
chain.head_beacon_block_root().map_err(warp_utils::reject::beacon_chain_error)?
} else {
chain
.root_at_slot(dependent_root_slot)
.map_err(warp_utils::reject::beacon_chain_error)?
.unwrap_or(chain.genesis_block_root)
};
beacon_proposer_cache
.lock()
.get_proposers(&chain, epoch)
.map(|duties| api_types::DutiesResponse { data: duties, dependent_root })
} else {
let state =
StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
.state(&chain)?;
let dependent_root_slot = state.current_epoch()
.start_slot(T::EthSpec::slots_per_epoch()) - 1;
let dependent_root = if dependent_root_slot > chain.best_slot().map_err(warp_utils::reject::beacon_chain_error)? {
chain.head_beacon_block_root().map_err(warp_utils::reject::beacon_chain_error)?
} else {
chain
.root_at_slot(dependent_root_slot)
.map_err(warp_utils::reject::beacon_chain_error)?
.unwrap_or(chain.genesis_block_root)
};
epoch
.slot_iter(T::EthSpec::slots_per_epoch())
.map(|slot| {
state
.get_beacon_proposer_index(slot, &chain.spec)
.map_err(warp_utils::reject::beacon_state_error)
.and_then(|i| {
let pubkey =
chain.validator_pubkey(i)
.map_err(warp_utils::reject::beacon_chain_error)?
.ok_or_else(||
warp_utils::reject::beacon_chain_error(
BeaconChainError::ValidatorPubkeyCacheIncomplete(i)
)
)?;
Ok(api_types::ProposerData {
pubkey: PublicKeyBytes::from(pubkey),
validator_index: i as u64,
slot,
})
})
})
.collect::<Result<Vec<api_types::ProposerData>, _>>()
.map(|duties| {
api_types::DutiesResponse {
dependent_root,
data: duties,
}
})
}
})
},
);
.and(log_filter.clone())
.and_then(|epoch: Epoch, chain: Arc<BeaconChain<T>>, log: Logger| {
blocking_json_task(move || proposer_duties::proposer_duties(epoch, &chain, &log))
});
// GET validator/blocks/{slot}
let get_validator_blocks = eth1_v1
@@ -1865,188 +1759,7 @@ pub fn serve<T: BeaconChainTypes>(
.and_then(
|epoch: Epoch, indices: api_types::ValidatorIndexData, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let current_epoch = chain
.epoch()
.map_err(warp_utils::reject::beacon_chain_error)?;
if epoch > current_epoch + 1 {
return Err(warp_utils::reject::custom_bad_request(format!(
"request epoch {} is more than one epoch past the current epoch {}",
epoch, current_epoch
)));
}
let validator_count = StateId::head()
.map_state(&chain, |state| Ok(state.validators.len() as u64))?;
let pubkeys = indices
.0
.iter()
.filter(|i| **i < validator_count as u64)
.map(|i| {
let pubkey = chain
.validator_pubkey(*i as usize)
.map_err(warp_utils::reject::beacon_chain_error)?
.ok_or_else(|| {
warp_utils::reject::custom_bad_request(format!(
"unknown validator index {}",
*i
))
})?;
Ok((*i, pubkey))
})
.collect::<Result<Vec<_>, warp::Rejection>>()?;
// Converts the internal Lighthouse `AttestationDuty` struct into an
// API-conforming `AttesterData` struct.
let convert = |validator_index: u64,
pubkey: PublicKey,
duty: AttestationDuty|
-> api_types::AttesterData {
api_types::AttesterData {
pubkey: pubkey.into(),
validator_index,
committees_at_slot: duty.committees_at_slot,
committee_index: duty.index,
committee_length: duty.committee_len as u64,
validator_committee_index: duty.committee_position as u64,
slot: duty.slot,
}
};
// Here we have two paths:
//
// ## Fast
//
// If the request epoch is the current epoch, use the cached beacon chain
// method.
//
// ## Slow
//
// If the request epoch is prior to the current epoch, load a beacon state from
// disk
//
// The idea is to stop historical requests from washing out the cache on the
// beacon chain, whilst allowing a VC to request duties quickly.
let (duties, dependent_root) = if epoch == current_epoch {
// Fast path.
let duties = pubkeys
.into_iter()
// Exclude indices which do not represent a known public key and a
// validator duty.
.filter_map(|(i, pubkey)| {
Some(
chain
.validator_attestation_duty(i as usize, epoch)
.transpose()?
.map_err(warp_utils::reject::beacon_chain_error)
.map(|duty| convert(i, pubkey, duty)),
)
})
.collect::<Result<Vec<_>, warp::Rejection>>()?;
let dependent_root_slot =
(epoch - 1).start_slot(T::EthSpec::slots_per_epoch()) - 1;
let dependent_root = if dependent_root_slot
> chain
.best_slot()
.map_err(warp_utils::reject::beacon_chain_error)?
{
chain
.head_beacon_block_root()
.map_err(warp_utils::reject::beacon_chain_error)?
} else {
chain
.root_at_slot(dependent_root_slot)
.map_err(warp_utils::reject::beacon_chain_error)?
.unwrap_or(chain.genesis_block_root)
};
(duties, dependent_root)
} else {
// If the head state is equal to or earlier than the request epoch, use it.
let mut state = chain
.with_head(|head| {
if head.beacon_state.current_epoch() <= epoch {
Ok(Some(
head.beacon_state
.clone_with(CloneConfig::committee_caches_only()),
))
} else {
Ok(None)
}
})
.map_err(warp_utils::reject::beacon_chain_error)?
.map(Result::Ok)
.unwrap_or_else(|| {
StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
.state(&chain)
})?;
// Only skip forward to the epoch prior to the request, since we have a
// one-epoch look-ahead on shuffling.
while state
.next_epoch()
.map_err(warp_utils::reject::beacon_state_error)?
< epoch
{
// Don't calculate state roots since they aren't required for calculating
// shuffling (achieved by providing Hash256::zero()).
per_slot_processing(&mut state, Some(Hash256::zero()), &chain.spec)
.map_err(warp_utils::reject::slot_processing_error)?;
}
let relative_epoch =
RelativeEpoch::from_epoch(state.current_epoch(), epoch).map_err(
|e| {
warp_utils::reject::custom_server_error(format!(
"unable to obtain suitable state: {:?}",
e
))
},
)?;
state
.build_committee_cache(relative_epoch, &chain.spec)
.map_err(warp_utils::reject::beacon_state_error)?;
let duties = pubkeys
.into_iter()
.filter_map(|(i, pubkey)| {
Some(
state
.get_attestation_duties(i as usize, relative_epoch)
.transpose()?
.map_err(warp_utils::reject::beacon_state_error)
.map(|duty| convert(i, pubkey, duty)),
)
})
.collect::<Result<Vec<_>, warp::Rejection>>()?;
let dependent_root_slot =
(epoch - 1).start_slot(T::EthSpec::slots_per_epoch()) - 1;
let dependent_root = if dependent_root_slot
> chain
.best_slot()
.map_err(warp_utils::reject::beacon_chain_error)?
{
chain
.head_beacon_block_root()
.map_err(warp_utils::reject::beacon_chain_error)?
} else {
chain
.root_at_slot(dependent_root_slot)
.map_err(warp_utils::reject::beacon_chain_error)?
.unwrap_or(chain.genesis_block_root)
};
(duties, dependent_root)
};
Ok(api_types::DutiesResponse {
dependent_root,
data: duties,
})
attester_duties::attester_duties(epoch, &indices.0, &chain)
})
},
);