Merge branch 'unstable' into dvt

This commit is contained in:
chonghe
2025-03-31 14:59:07 +08:00
committed by GitHub
494 changed files with 14225 additions and 37939 deletions

View File

@@ -23,7 +23,6 @@ use initialized_validators::Error::UnableToOpenVotingKeystore;
use notifier::spawn_notifier;
use parking_lot::RwLock;
use reqwest::Certificate;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use slot_clock::SystemTimeSlotClock;
use std::fs::File;
@@ -37,6 +36,7 @@ use tokio::{
sync::mpsc,
time::{sleep, Duration},
};
use tracing::{debug, error, info, warn};
use types::{EthSpec, Hash256};
use validator_http_api::ApiSecret;
use validator_services::{
@@ -99,7 +99,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
cli_args: &ArgMatches,
validator_client_config: &ValidatorClient,
) -> Result<Self, String> {
let config = Config::from_cli(cli_args, validator_client_config, context.log())
let config = Config::from_cli(cli_args, validator_client_config)
.map_err(|e| format!("Unable to initialize config: {}", e))?;
Self::new(context, config).await
}
@@ -107,8 +107,6 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
/// Instantiates the validator client, _without_ starting the timers to trigger block
/// and attestation production.
pub async fn new(context: RuntimeContext<E>, config: Config) -> Result<Self, String> {
let log = context.log().clone();
// Attempt to raise soft fd limit. The behavior is OS specific:
// `linux` - raise soft fd limit to hard
// `macos` - raise soft fd limit to `min(kernel limit, hard fd limit)`
@@ -116,25 +114,20 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
match fdlimit::raise_fd_limit().map_err(|e| format!("Unable to raise fd limit: {}", e))? {
fdlimit::Outcome::LimitRaised { from, to } => {
debug!(
log,
"Raised soft open file descriptor resource limit";
"old_limit" => from,
"new_limit" => to
old_limit = from,
new_limit = to,
"Raised soft open file descriptor resource limit"
);
}
fdlimit::Outcome::Unsupported => {
debug!(
log,
"Raising soft open file descriptor resource limit is not supported"
);
debug!("Raising soft open file descriptor resource limit is not supported");
}
};
info!(
log,
"Starting validator client";
"beacon_nodes" => format!("{:?}", &config.beacon_nodes),
"validator_dir" => format!("{:?}", config.validator_dir),
beacon_nodes = ?config.beacon_nodes,
validator_dir = ?config.validator_dir,
"Starting validator client"
);
// Optionally start the metrics server.
@@ -149,7 +142,6 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
Arc::new(validator_http_metrics::Context {
config: config.http_metrics.clone(),
shared: RwLock::new(shared),
log: log.clone(),
});
let exit = context.executor.exit();
@@ -164,15 +156,14 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
Some(ctx)
} else {
info!(log, "HTTP metrics server is disabled");
info!("HTTP metrics server is disabled");
None
};
// Start the explorer client which periodically sends validator process
// and system metrics to the configured endpoint.
if let Some(monitoring_config) = &config.monitoring_api {
let monitoring_client =
MonitoringHttpClient::new(monitoring_config, context.log().clone())?;
let monitoring_client = MonitoringHttpClient::new(monitoring_config)?;
monitoring_client.auto_update(
context.executor.clone(),
vec![ProcessType::Validator, ProcessType::System],
@@ -184,7 +175,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
if !config.disable_auto_discover {
let new_validators = validator_defs
.discover_local_keystores(&config.validator_dir, &config.secrets_dir, &log)
.discover_local_keystores(&config.validator_dir, &config.secrets_dir)
.map_err(|e| format!("Unable to discover local validator keystores: {:?}", e))?;
validator_defs.save(&config.validator_dir).map_err(|e| {
format!(
@@ -192,18 +183,13 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
e
)
})?;
info!(
log,
"Completed validator discovery";
"new_validators" => new_validators,
);
info!(new_validators, "Completed validator discovery");
}
let validators = InitializedValidators::from_definitions(
validator_defs,
config.validator_dir.clone(),
config.initialized_validators.clone(),
log.clone(),
)
.await
.map_err(|e| {
@@ -220,17 +206,17 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
let voting_pubkeys: Vec<_> = validators.iter_voting_pubkeys().collect();
info!(
log,
"Initialized validators";
"disabled" => validators.num_total().saturating_sub(validators.num_enabled()),
"enabled" => validators.num_enabled(),
disabled = validators
.num_total()
.saturating_sub(validators.num_enabled()),
enabled = validators.num_enabled(),
"Initialized validators"
);
if voting_pubkeys.is_empty() {
warn!(
log,
"No enabled validators";
"hint" => "create validators via the API, or the `lighthouse account` CLI command"
hint = "create validators via the API, or the `lighthouse account` CLI command",
"No enabled validators"
);
}
@@ -305,10 +291,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// Use quicker timeouts if a fallback beacon node exists.
let timeouts = if i < last_beacon_node_index && !config.use_long_timeouts {
info!(
log,
"Fallback endpoints are available, using optimized timeouts.";
);
info!("Fallback endpoints are available, using optimized timeouts.");
Timeouts {
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
@@ -330,7 +313,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
get_validator_block: slot_duration / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT,
}
} else {
Timeouts::set_all(slot_duration)
Timeouts::set_all(slot_duration.saturating_mul(config.long_timeouts_multiplier))
};
Ok(BeaconNodeHttpClient::from_components(
@@ -394,7 +377,6 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
);
let mut proposer_nodes: BeaconNodeFallback<_, E> = BeaconNodeFallback::new(
@@ -402,12 +384,11 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
);
// Perform some potentially long-running initialization tasks.
let (genesis_time, genesis_validators_root) = tokio::select! {
tuple = init_from_beacon_node(&beacon_nodes, &proposer_nodes, &context) => tuple?,
tuple = init_from_beacon_node(&beacon_nodes, &proposer_nodes) => tuple?,
() = context.executor.exit() => return Err("Shutting down".to_string())
};
@@ -432,12 +413,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
start_fallback_updater_service(context.clone(), proposer_nodes.clone())?;
let doppelganger_service = if config.enable_doppelganger_protection {
Some(Arc::new(DoppelgangerService::new(
context
.service_context(DOPPELGANGER_SERVICE_NAME.into())
.log()
.clone(),
)))
Some(Arc::new(DoppelgangerService::default()))
} else {
None
};
@@ -451,16 +427,14 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
slot_clock.clone(),
&config.validator_store,
context.executor.clone(),
log.clone(),
));
// Ensure all validators are registered in doppelganger protection.
validator_store.register_all_in_doppelganger_protection_if_enabled()?;
info!(
log,
"Loaded validator keypair store";
"voting_validators" => validator_store.num_voting_validators()
voting_validators = validator_store.num_voting_validators(),
"Loaded validator keypair store"
);
// Perform pruning of the slashing protection database on start-up. In case the database is
@@ -483,6 +457,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
context: duties_context,
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics,
distributed: config.distributed,
disable_attesting: config.disable_attesting,
});
// Update the metrics server.
@@ -512,6 +487,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("attestation".into()))
.disable(config.disable_attesting)
.build()?;
let preparation_service = PreparationServiceBuilder::new()
@@ -554,7 +530,6 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// whole epoch!
let channel_capacity = E::slots_per_epoch() as usize;
let (block_service_tx, block_service_rx) = mpsc::channel(channel_capacity);
let log = self.context.log();
let api_secret = ApiSecret::create_or_open(&self.config.http_api.http_token_path)?;
@@ -572,7 +547,6 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
config: self.config.http_api.clone(),
sse_logging_components: self.context.sse_logging_components.clone(),
slot_clock: self.slot_clock.clone(),
log: log.clone(),
_phantom: PhantomData,
});
@@ -588,12 +562,12 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
Some(listen_addr)
} else {
info!(log, "HTTP API server is disabled");
info!("HTTP API server is disabled");
None
};
// Wait until genesis has occurred.
wait_for_genesis(&self.beacon_nodes, self.genesis_time, &self.context).await?;
wait_for_genesis(&self.beacon_nodes, self.genesis_time).await?;
duties_service::start_update_service(self.duties_service.clone(), block_service_tx);
@@ -628,7 +602,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
)
.map_err(|e| format!("Unable to start doppelganger service: {}", e))?
} else {
info!(log, "Doppelganger protection disabled.")
info!("Doppelganger protection disabled.")
}
spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?;
@@ -648,7 +622,6 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
async fn init_from_beacon_node<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
proposer_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
context: &RuntimeContext<E>,
) -> Result<(u64, Hash256), String> {
loop {
beacon_nodes.update_all_candidates().await;
@@ -662,41 +635,37 @@ async fn init_from_beacon_node<E: EthSpec>(
if proposer_total > 0 && proposer_available == 0 {
warn!(
context.log(),
"Unable to connect to a proposer node";
"retry in" => format!("{} seconds", RETRY_DELAY.as_secs()),
"total_proposers" => proposer_total,
"available_proposers" => proposer_available,
"total_beacon_nodes" => num_total,
"available_beacon_nodes" => num_available,
retry_in = format!("{} seconds", RETRY_DELAY.as_secs()),
total_proposers = proposer_total,
available_proposers = proposer_available,
total_beacon_nodes = num_total,
available_beacon_nodes = num_available,
"Unable to connect to a proposer node"
);
}
if num_available > 0 && proposer_available == 0 {
info!(
context.log(),
"Initialized beacon node connections";
"total" => num_total,
"available" => num_available,
total = num_total,
available = num_available,
"Initialized beacon node connections"
);
break;
} else if num_available > 0 {
info!(
context.log(),
"Initialized beacon node connections";
"total" => num_total,
"available" => num_available,
"proposers_available" => proposer_available,
"proposers_total" => proposer_total,
total = num_total,
available = num_available,
proposer_available,
proposer_total,
"Initialized beacon node connections"
);
break;
} else {
warn!(
context.log(),
"Unable to connect to a beacon node";
"retry in" => format!("{} seconds", RETRY_DELAY.as_secs()),
"total" => num_total,
"available" => num_available,
retry_in = format!("{} seconds", RETRY_DELAY.as_secs()),
total = num_total,
available = num_available,
"Unable to connect to a beacon node"
);
sleep(RETRY_DELAY).await;
}
@@ -717,15 +686,11 @@ async fn init_from_beacon_node<E: EthSpec>(
.filter_map(|(_, e)| e.request_failure())
.any(|e| e.status() == Some(StatusCode::NOT_FOUND))
{
info!(
context.log(),
"Waiting for genesis";
);
info!("Waiting for genesis");
} else {
error!(
context.log(),
"Errors polling beacon node";
"error" => %errors
%errors,
"Errors polling beacon node"
);
}
}
@@ -740,7 +705,6 @@ async fn init_from_beacon_node<E: EthSpec>(
async fn wait_for_genesis<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
genesis_time: u64,
context: &RuntimeContext<E>,
) -> Result<(), String> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -754,28 +718,25 @@ async fn wait_for_genesis<E: EthSpec>(
// the slot clock.
if now < genesis_time {
info!(
context.log(),
"Starting node prior to genesis";
"seconds_to_wait" => (genesis_time - now).as_secs()
seconds_to_wait = (genesis_time - now).as_secs(),
"Starting node prior to genesis"
);
// Start polling the node for pre-genesis information, cancelling the polling as soon as the
// timer runs out.
tokio::select! {
result = poll_whilst_waiting_for_genesis(beacon_nodes, genesis_time, context.log()) => result?,
result = poll_whilst_waiting_for_genesis(beacon_nodes, genesis_time) => result?,
() = sleep(genesis_time - now) => ()
};
info!(
context.log(),
"Genesis has occurred";
"ms_since_genesis" => (genesis_time - now).as_millis()
ms_since_genesis = (genesis_time - now).as_millis(),
"Genesis has occurred"
);
} else {
info!(
context.log(),
"Genesis has already occurred";
"seconds_ago" => (now - genesis_time).as_secs()
seconds_ago = (now - genesis_time).as_secs(),
"Genesis has already occurred"
);
}
@@ -787,7 +748,6 @@ async fn wait_for_genesis<E: EthSpec>(
async fn poll_whilst_waiting_for_genesis<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
genesis_time: Duration,
log: &Logger,
) -> Result<(), String> {
loop {
match beacon_nodes
@@ -801,19 +761,17 @@ async fn poll_whilst_waiting_for_genesis<E: EthSpec>(
if !is_staking {
error!(
log,
"Staking is disabled for beacon node";
"msg" => "this will caused missed duties",
"info" => "see the --staking CLI flag on the beacon node"
msg = "this will caused missed duties",
info = "see the --staking CLI flag on the beacon node",
"Staking is disabled for beacon node"
);
}
if now < genesis_time {
info!(
log,
"Waiting for genesis";
"bn_staking_enabled" => is_staking,
"seconds_to_wait" => (genesis_time - now).as_secs()
bn_staking_enabled = is_staking,
seconds_to_wait = (genesis_time - now).as_secs(),
"Waiting for genesis"
);
} else {
break Ok(());
@@ -821,9 +779,8 @@ async fn poll_whilst_waiting_for_genesis<E: EthSpec>(
}
Err(e) => {
error!(
log,
"Error polling beacon node";
"error" => %e
error = %e,
"Error polling beacon node"
);
}
}