Merge branch 'unstable' into vc-fallback

This commit is contained in:
Mac L
2023-12-01 12:52:54 +11:00
120 changed files with 4745 additions and 2348 deletions

View File

@@ -61,3 +61,4 @@ malloc_utils = { workspace = true }
sysinfo = { workspace = true }
system_health = { path = "../common/system_health" }
logging = { workspace = true }
strum = { workspace = true }

View File

@@ -1,4 +1,4 @@
use crate::beacon_node_fallback::BeaconNodeFallback;
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use crate::{
duties_service::{DutiesService, DutyAndProof},
http_metrics::metrics,
@@ -428,7 +428,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Post the attestations to the BN.
match self
.beacon_nodes
.first_success(|beacon_node| async move {
.request(ApiTopic::Attestations, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],

View File

@@ -22,6 +22,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum::{EnumString, EnumVariantNames};
use tokio::{sync::RwLock, time::sleep};
use types::{ChainSpec, Config as ConfigSpec, EthSpec, Slot};
@@ -342,9 +343,9 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
#[derive(Clone, Debug)]
pub struct BeaconNodeFallback<T, E> {
pub candidates: Arc<RwLock<Vec<CandidateBeaconNode<E>>>>,
disable_run_on_all: bool,
distance_tiers: BeaconNodeSyncDistanceTiers,
slot_clock: Option<T>,
broadcast_topics: Vec<ApiTopic>,
spec: ChainSpec,
log: Logger,
}
@@ -353,15 +354,16 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
pub fn new(
candidates: Vec<CandidateBeaconNode<E>>,
config: Config,
broadcast_topics: Vec<ApiTopic>,
spec: ChainSpec,
log: Logger,
) -> Self {
let distance_tiers = BeaconNodeSyncDistanceTiers::from_config(&config);
Self {
candidates: Arc::new(RwLock::new(candidates)),
disable_run_on_all: config.disable_run_on_all,
distance_tiers,
slot_clock: None,
broadcast_topics,
spec,
log,
}
@@ -587,7 +589,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
/// It returns a list of errors along with the beacon node id that failed for `func`.
/// Since this ignores the actual result of `func`, this function should only be used for beacon
/// node calls whose results we do not care about, only that they completed successfully.
pub async fn run_on_all<F, O, Err, R>(&self, func: F) -> Result<(), Errors<Err>>
pub async fn broadcast<F, O, Err, R>(&self, func: F) -> Result<(), Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R,
R: Future<Output = Result<O, Err>>,
@@ -635,32 +637,61 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
}
/// Call `func` on first beacon node that returns success or on all beacon nodes
/// depending on the value of `disable_run_on_all`.
pub async fn run<F, Err, R>(&self, func: F) -> Result<(), Errors<Err>>
/// depending on the `topic` and configuration.
pub async fn request<F, Err, R>(&self, topic: ApiTopic, func: F) -> Result<(), Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R,
R: Future<Output = Result<(), Err>>,
Err: Debug,
{
if self.disable_run_on_all {
if self.broadcast_topics.contains(&topic) {
self.broadcast(func).await
} else {
self.first_success(func).await?;
Ok(())
} else {
self.run_on_all(func).await
}
}
}
/// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted.
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)]
#[strum(serialize_all = "kebab-case")]
pub enum ApiTopic {
Attestations,
Blocks,
Subscriptions,
SyncCommittee,
}
impl ApiTopic {
pub fn all() -> Vec<ApiTopic> {
use ApiTopic::*;
vec![Attestations, Blocks, Subscriptions, SyncCommittee]
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::beacon_node_health::BeaconNodeHealthTier;
use crate::SensitiveUrl;
use eth2::Timeouts;
use std::str::FromStr;
use strum::VariantNames;
use types::{MainnetEthSpec, Slot};
type E = MainnetEthSpec;
#[test]
fn api_topic_all() {
let all = ApiTopic::all();
assert_eq!(all.len(), ApiTopic::VARIANTS.len());
assert!(ApiTopic::VARIANTS
.iter()
.map(|topic| ApiTopic::from_str(topic).unwrap())
.eq(all.into_iter()));
}
#[test]
fn check_candidate_order() {
// These fields is irrelvant for sorting. They are set to arbitrary values.

View File

@@ -1,6 +1,8 @@
use crate::beacon_node_fallback::{Error as FallbackError, Errors};
use crate::{
beacon_node_fallback::BeaconNodeFallback, determine_graffiti, graffiti_file::GraffitiFile,
beacon_node_fallback::{ApiTopic, BeaconNodeFallback},
determine_graffiti,
graffiti_file::GraffitiFile,
};
use crate::{
http_metrics::metrics,
@@ -18,7 +20,6 @@ use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
use types::{
AbstractExecPayload, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, PublicKeyBytes,
Slot,
@@ -54,7 +55,6 @@ pub struct BlockServiceBuilder<T, E: EthSpec> {
context: Option<RuntimeContext<E>>,
graffiti: Option<Graffiti>,
graffiti_file: Option<GraffitiFile>,
block_delay: Option<Duration>,
}
impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
@@ -67,7 +67,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
context: None,
graffiti: None,
graffiti_file: None,
block_delay: None,
}
}
@@ -106,11 +105,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
self
}
pub fn block_delay(mut self, block_delay: Option<Duration>) -> Self {
self.block_delay = block_delay;
self
}
pub fn build(self) -> Result<BlockService<T, E>, String> {
Ok(BlockService {
inner: Arc::new(Inner {
@@ -129,7 +123,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
proposer_nodes: self.proposer_nodes,
graffiti: self.graffiti,
graffiti_file: self.graffiti_file,
block_delay: self.block_delay,
}),
})
}
@@ -144,31 +137,29 @@ pub struct ProposerFallback<T, E: EthSpec> {
impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
// Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`.
pub async fn first_success_try_proposers_first<F, O, Err, R>(
&self,
func: F,
) -> Result<O, Errors<Err>>
pub async fn request_proposers_first<F, Err, R>(&self, func: F) -> Result<(), Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<O, Err>>,
R: Future<Output = Result<(), Err>>,
Err: Debug,
{
// If there are proposer nodes, try calling `func` on them and return early if they are successful.
if let Some(proposer_nodes) = &self.proposer_nodes {
if let Ok(result) = proposer_nodes.first_success(func.clone()).await {
return Ok(result);
if proposer_nodes
.request(ApiTopic::Blocks, func.clone())
.await
.is_ok()
{
return Ok(());
}
}
// If the proposer nodes failed, try on the non-proposer nodes.
self.beacon_nodes.first_success(func).await
self.beacon_nodes.request(ApiTopic::Blocks, func).await
}
// Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`.
pub async fn first_success_try_proposers_last<F, O, Err, R>(
&self,
func: F,
) -> Result<O, Errors<Err>>
pub async fn request_proposers_last<F, O, Err, R>(&self, func: F) -> Result<O, Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<O, Err>>,
@@ -197,7 +188,6 @@ pub struct Inner<T, E: EthSpec> {
context: RuntimeContext<E>,
graffiti: Option<Graffiti>,
graffiti_file: Option<GraffitiFile>,
block_delay: Option<Duration>,
}
/// Attempts to produce attestations for any block producer(s) at the start of the epoch.
@@ -241,18 +231,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
executor.spawn(
async move {
while let Some(notif) = notification_rx.recv().await {
let service = self.clone();
if let Some(delay) = service.block_delay {
debug!(
service.context.log(),
"Delaying block production by {}ms",
delay.as_millis()
);
sleep(delay).await;
}
service.do_update(notif).await.ok();
self.do_update(notif).await.ok();
}
debug!(log, "Block service shutting down");
},
@@ -457,8 +436,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// Try the proposer nodes last, since it's likely that they don't have a
// great view of attestations on the network.
let block_contents = proposer_fallback
.first_success_try_proposers_last(|beacon_node| {
let beacon_node = beacon_node;
.request_proposers_last(move |beacon_node| {
Self::get_validator_block(
beacon_node,
slot,
@@ -547,8 +525,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// protect them from DoS attacks and they're most likely to successfully
// publish a block.
proposer_fallback
.first_success_try_proposers_first(|beacon_node| async {
let beacon_node = beacon_node;
.request_proposers_first(|beacon_node| async {
self.publish_signed_block_contents::<Payload>(&signed_block_contents, beacon_node)
.await
})

View File

@@ -8,15 +8,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
"When connected to a beacon node, performs the duties of a staked \
validator (e.g., proposing blocks and attestations).",
)
// This argument is deprecated, use `--beacon-nodes` instead.
.arg(
Arg::with_name("beacon-node")
.long("beacon-node")
.value_name("NETWORK_ADDRESS")
.help("Deprecated. Use --beacon-nodes.")
.takes_value(true)
.conflicts_with("beacon-nodes"),
)
.arg(
Arg::with_name("beacon-nodes")
.long("beacon-nodes")
@@ -35,24 +26,28 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
)
.takes_value(true),
)
// TODO remove this flag in a future release
.arg(
Arg::with_name("disable-run-on-all")
.long("disable-run-on-all")
.value_name("DISABLE_RUN_ON_ALL")
.help("By default, Lighthouse publishes attestation, sync committee subscriptions \
.help("DEPRECATED. Use --broadcast. \
By default, Lighthouse publishes attestation, sync committee subscriptions \
and proposer preparation messages to all beacon nodes provided in the \
`--beacon-nodes flag`. This option changes that behaviour such that these \
api calls only go out to the first available and synced beacon node")
.takes_value(false)
.takes_value(false),
)
// This argument is deprecated, use `--beacon-nodes` instead.
.arg(
Arg::with_name("server")
.long("server")
.value_name("NETWORK_ADDRESS")
.help("Deprecated. Use --beacon-nodes.")
.takes_value(true)
.conflicts_with_all(&["beacon-node", "beacon-nodes"]),
Arg::with_name("broadcast")
.long("broadcast")
.value_name("API_TOPICS")
.help("Comma-separated list of beacon API topics to broadcast to all beacon nodes. \
Possible values are: none, attestations, blocks, subscriptions, \
sync-committee. Default (when flag is omitted) is to broadcast \
subscriptions only."
)
.takes_value(true),
)
.arg(
Arg::with_name("validators-dir")
@@ -80,13 +75,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true)
.conflicts_with("datadir")
)
.arg(
Arg::with_name("delete-lockfiles")
.long("delete-lockfiles")
.help(
"DEPRECATED. This flag does nothing and will be removed in a future release."
)
)
.arg(
Arg::with_name("init-slashing-protection")
.long("init-slashing-protection")
@@ -106,11 +94,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
will need to be manually added to the validator_definitions.yml file."
)
)
.arg(
Arg::with_name("allow-unsynced")
.long("allow-unsynced")
.help("DEPRECATED: this flag does nothing"),
)
.arg(
Arg::with_name("use-long-timeouts")
.long("use-long-timeouts")
@@ -319,18 +302,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
headers during proposals and will sign over headers. Useful for outsourcing \
execution payload construction during proposals.")
.takes_value(false),
).arg(
Arg::with_name("strict-fee-recipient")
.long("strict-fee-recipient")
.help("[DEPRECATED] If this flag is set, Lighthouse will refuse to sign any block whose \
`fee_recipient` does not match the `suggested_fee_recipient` sent by this validator. \
This applies to both the normal block proposal flow, as well as block proposals \
through the builder API. Proposals through the builder API are more likely to have a \
discrepancy in `fee_recipient` so you should be aware of how your connected relay \
sends proposer payments before using this flag. If this flag is used, a fee recipient \
mismatch in the builder API flow will result in a fallback to the local execution engine \
for payload construction, where a strict fee recipient check will still be applied.")
.takes_value(false),
)
.arg(
Arg::with_name("builder-registration-timestamp-override")
@@ -395,16 +366,4 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
Beacon Nodes. The range falls immediately after the end of the `small` range.")
.takes_value(true)
)
/*
* Experimental/development options.
*/
.arg(
Arg::with_name("block-delay-ms")
.long("block-delay-ms")
.value_name("MILLIS")
.hidden(true)
.help("Time to delay block production from the start of the slot. Should only be \
used for testing.")
.takes_value(true),
)
}

View File

@@ -1,3 +1,4 @@
use crate::beacon_node_fallback::ApiTopic;
use crate::graffiti_file::GraffitiFile;
use crate::{beacon_node_fallback, http_api, http_metrics};
use clap::ArgMatches;
@@ -13,7 +14,6 @@ use slog::{info, warn, Logger};
use std::fs;
use std::net::IpAddr;
use std::path::PathBuf;
use std::time::Duration;
use types::{Address, GRAFFITI_BYTES_LEN};
pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/";
@@ -71,10 +71,8 @@ pub struct Config {
/// A list of custom certificates that the validator client will additionally use when
/// connecting to a beacon node over SSL/TLS.
pub beacon_nodes_tls_certs: Option<Vec<PathBuf>>,
/// Delay from the start of the slot to wait before publishing a block.
///
/// This is *not* recommended in prod and should only be used for testing.
pub block_delay: Option<Duration>,
/// Enables broadcasting of various requests (by topic) to all beacon nodes.
pub broadcast_topics: Vec<ApiTopic>,
/// Enables a service which attempts to measure latency between the VC and BNs.
pub enable_latency_measurement_service: bool,
/// Defines the number of validators per `validator/register_validator` request sent to the BN.
@@ -114,10 +112,10 @@ impl Default for Config {
enable_doppelganger_protection: false,
enable_high_validator_count_metrics: false,
beacon_nodes_tls_certs: None,
block_delay: None,
builder_proposals: false,
builder_registration_timestamp_override: None,
gas_limit: None,
broadcast_topics: vec![ApiTopic::Subscriptions],
enable_latency_measurement_service: true,
validator_registration_batch_size: 500,
}
@@ -171,27 +169,6 @@ impl Config {
.collect::<Result<_, _>>()
.map_err(|e| format!("Unable to parse beacon node URL: {:?}", e))?;
}
// To be deprecated.
else if let Some(beacon_node) = parse_optional::<String>(cli_args, "beacon-node")? {
warn!(
log,
"The --beacon-node flag is deprecated";
"msg" => "please use --beacon-nodes instead"
);
config.beacon_nodes = vec![SensitiveUrl::parse(&beacon_node)
.map_err(|e| format!("Unable to parse beacon node URL: {:?}", e))?];
}
// To be deprecated.
else if let Some(server) = parse_optional::<String>(cli_args, "server")? {
warn!(
log,
"The --server flag is deprecated";
"msg" => "please use --beacon-nodes instead"
);
config.beacon_nodes = vec![SensitiveUrl::parse(&server)
.map_err(|e| format!("Unable to parse beacon node URL: {:?}", e))?];
}
if let Some(proposer_nodes) = parse_optional::<String>(cli_args, "proposer_nodes")? {
config.proposer_nodes = proposer_nodes
.split(',')
@@ -200,21 +177,6 @@ impl Config {
.map_err(|e| format!("Unable to parse proposer node URL: {:?}", e))?;
}
if cli_args.is_present("delete-lockfiles") {
warn!(
log,
"The --delete-lockfiles flag is deprecated";
"msg" => "it is no longer necessary, and no longer has any effect",
);
}
if cli_args.is_present("allow-unsynced") {
warn!(
log,
"The --allow-unsynced flag is deprecated";
"msg" => "it no longer has any effect",
);
}
config.disable_auto_discover = cli_args.is_present("disable-auto-discover");
config.init_slashing_protection = cli_args.is_present("init-slashing-protection");
config.use_long_timeouts = cli_args.is_present("use-long-timeouts");
@@ -257,6 +219,26 @@ impl Config {
config.beacon_nodes_tls_certs = Some(tls_certs.split(',').map(PathBuf::from).collect());
}
if cli_args.is_present("disable-run-on-all") {
warn!(
log,
"The --disable-run-on-all flag is deprecated";
"msg" => "please use --broadcast instead"
);
config.broadcast_topics = vec![];
}
if let Some(broadcast_topics) = cli_args.value_of("broadcast") {
config.broadcast_topics = broadcast_topics
.split(',')
.filter(|t| *t != "none")
.map(|t| {
t.trim()
.parse::<ApiTopic>()
.map_err(|_| format!("Unknown API topic to broadcast: {t}"))
})
.collect::<Result<_, _>>()?;
}
/*
* Beacon node fallback
*/
@@ -411,14 +393,6 @@ impl Config {
);
}
if cli_args.is_present("strict-fee-recipient") {
warn!(
log,
"The flag `--strict-fee-recipient` has been deprecated due to a bug causing \
missed proposals. The flag will be ignored."
);
}
config.enable_latency_measurement_service =
parse_optional(cli_args, "latency-measurement-service")?.unwrap_or(true);
@@ -428,13 +402,6 @@ impl Config {
return Err("validator-registration-batch-size cannot be 0".to_string());
}
/*
* Experimental
*/
if let Some(delay_ms) = parse_optional::<u64>(cli_args, "block-delay-ms")? {
config.block_delay = Some(Duration::from_millis(delay_ms));
}
Ok(config)
}
}

View File

@@ -8,7 +8,7 @@
mod sync;
use crate::beacon_node_fallback::BeaconNodeFallback;
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY};
use crate::{
block_service::BlockServiceNotification,
@@ -696,7 +696,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let subscriptions_ref = &subscriptions;
if let Err(e) = duties_service
.beacon_nodes
.run(|beacon_node| async move {
.request(ApiTopic::Subscriptions, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::SUBSCRIPTIONS_HTTP_POST],

View File

@@ -20,6 +20,7 @@ pub mod http_api;
pub mod initialized_validators;
pub mod validator_store;
pub use beacon_node_fallback::ApiTopic;
pub use cli::cli_app;
pub use config::Config;
use initialized_validators::InitializedValidators;
@@ -373,6 +374,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let mut beacon_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new(
candidates,
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
);
@@ -380,6 +382,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let mut proposer_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new(
proposer_candidates,
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
);
@@ -474,8 +477,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("block".into()))
.graffiti(config.graffiti)
.graffiti_file(config.graffiti_file.clone())
.block_delay(config.block_delay);
.graffiti_file(config.graffiti_file.clone());
// If we have proposer nodes, add them to the block service builder.
if proposer_nodes_num > 0 {

View File

@@ -1,4 +1,4 @@
use crate::beacon_node_fallback::BeaconNodeFallback;
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use crate::validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
use bls::PublicKeyBytes;
use environment::RuntimeContext;
@@ -341,7 +341,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
let preparation_entries = preparation_data.as_slice();
match self
.beacon_nodes
.run(|beacon_node| async move {
.request(ApiTopic::Subscriptions, |beacon_node| async move {
beacon_node
.post_validator_prepare_beacon_proposer(preparation_entries)
.await

View File

@@ -1,4 +1,4 @@
use crate::beacon_node_fallback::BeaconNodeFallback;
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use crate::{
duties_service::DutiesService,
validator_store::{Error as ValidatorStoreError, ValidatorStore},
@@ -296,7 +296,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.collect::<Vec<_>>();
self.beacon_nodes
.first_success(|beacon_node| async move {
.request(ApiTopic::SyncCommittee, |beacon_node| async move {
beacon_node
.post_beacon_pool_sync_committee_signatures(committee_signatures)
.await
@@ -579,7 +579,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
if let Err(e) = self
.beacon_nodes
.run(|beacon_node| async move {
.request(ApiTopic::Subscriptions, |beacon_node| async move {
beacon_node
.post_validator_sync_committee_subscriptions(subscriptions_slice)
.await