mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-23 14:54:45 +00:00
Merge branch 'unstable' into validator-manager
This commit is contained in:
@@ -24,6 +24,7 @@ pub use config::Config;
|
||||
use initialized_validators::InitializedValidators;
|
||||
use lighthouse_metrics::set_gauge;
|
||||
use monitoring_api::{MonitoringHttpClient, ProcessType};
|
||||
use sensitive_url::SensitiveUrl;
|
||||
pub use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
|
||||
|
||||
use crate::beacon_node_fallback::{
|
||||
@@ -94,6 +95,7 @@ pub struct ProductionValidatorClient<T: EthSpec> {
|
||||
doppelganger_service: Option<Arc<DoppelgangerService>>,
|
||||
preparation_service: PreparationService<SystemTimeSlotClock, T>,
|
||||
validator_store: Arc<ValidatorStore<SystemTimeSlotClock, T>>,
|
||||
slot_clock: SystemTimeSlotClock,
|
||||
http_api_listen_addr: Option<SocketAddr>,
|
||||
config: Config,
|
||||
}
|
||||
@@ -262,60 +264,70 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
.checked_sub(1)
|
||||
.ok_or_else(|| "No beacon nodes defined.".to_string())?;
|
||||
|
||||
let beacon_node_setup = |x: (usize, &SensitiveUrl)| {
|
||||
let i = x.0;
|
||||
let url = x.1;
|
||||
let slot_duration = Duration::from_secs(context.eth2_config.spec.seconds_per_slot);
|
||||
|
||||
let mut beacon_node_http_client_builder = ClientBuilder::new();
|
||||
|
||||
// Add new custom root certificates if specified.
|
||||
if let Some(certificates) = &config.beacon_nodes_tls_certs {
|
||||
for cert in certificates {
|
||||
beacon_node_http_client_builder = beacon_node_http_client_builder
|
||||
.add_root_certificate(load_pem_certificate(cert)?);
|
||||
}
|
||||
}
|
||||
|
||||
let beacon_node_http_client = beacon_node_http_client_builder
|
||||
// Set default timeout to be the full slot duration.
|
||||
.timeout(slot_duration)
|
||||
.build()
|
||||
.map_err(|e| format!("Unable to build HTTP client: {:?}", e))?;
|
||||
|
||||
// Use quicker timeouts if a fallback beacon node exists.
|
||||
let timeouts = if i < last_beacon_node_index && !config.use_long_timeouts {
|
||||
info!(
|
||||
log,
|
||||
"Fallback endpoints are available, using optimized timeouts.";
|
||||
);
|
||||
Timeouts {
|
||||
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
|
||||
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
|
||||
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
|
||||
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
|
||||
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
|
||||
sync_committee_contribution: slot_duration
|
||||
/ HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT,
|
||||
sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
|
||||
get_beacon_blocks_ssz: slot_duration
|
||||
/ HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT,
|
||||
get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
|
||||
get_deposit_snapshot: slot_duration / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
|
||||
}
|
||||
} else {
|
||||
Timeouts::set_all(slot_duration)
|
||||
};
|
||||
|
||||
Ok(BeaconNodeHttpClient::from_components(
|
||||
url.clone(),
|
||||
beacon_node_http_client,
|
||||
timeouts,
|
||||
))
|
||||
};
|
||||
|
||||
let beacon_nodes: Vec<BeaconNodeHttpClient> = config
|
||||
.beacon_nodes
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, url)| {
|
||||
let slot_duration = Duration::from_secs(context.eth2_config.spec.seconds_per_slot);
|
||||
.map(beacon_node_setup)
|
||||
.collect::<Result<Vec<BeaconNodeHttpClient>, String>>()?;
|
||||
|
||||
let mut beacon_node_http_client_builder = ClientBuilder::new();
|
||||
|
||||
// Add new custom root certificates if specified.
|
||||
if let Some(certificates) = &config.beacon_nodes_tls_certs {
|
||||
for cert in certificates {
|
||||
beacon_node_http_client_builder = beacon_node_http_client_builder
|
||||
.add_root_certificate(load_pem_certificate(cert)?);
|
||||
}
|
||||
}
|
||||
|
||||
let beacon_node_http_client = beacon_node_http_client_builder
|
||||
// Set default timeout to be the full slot duration.
|
||||
.timeout(slot_duration)
|
||||
.build()
|
||||
.map_err(|e| format!("Unable to build HTTP client: {:?}", e))?;
|
||||
|
||||
// Use quicker timeouts if a fallback beacon node exists.
|
||||
let timeouts = if i < last_beacon_node_index && !config.use_long_timeouts {
|
||||
info!(
|
||||
log,
|
||||
"Fallback endpoints are available, using optimized timeouts.";
|
||||
);
|
||||
Timeouts {
|
||||
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
|
||||
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
|
||||
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
|
||||
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
|
||||
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
|
||||
sync_committee_contribution: slot_duration
|
||||
/ HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT,
|
||||
sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
|
||||
get_beacon_blocks_ssz: slot_duration
|
||||
/ HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT,
|
||||
get_debug_beacon_states: slot_duration
|
||||
/ HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
|
||||
get_deposit_snapshot: slot_duration / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
|
||||
}
|
||||
} else {
|
||||
Timeouts::set_all(slot_duration)
|
||||
};
|
||||
|
||||
Ok(BeaconNodeHttpClient::from_components(
|
||||
url.clone(),
|
||||
beacon_node_http_client,
|
||||
timeouts,
|
||||
))
|
||||
})
|
||||
let proposer_nodes: Vec<BeaconNodeHttpClient> = config
|
||||
.proposer_nodes
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(beacon_node_setup)
|
||||
.collect::<Result<Vec<BeaconNodeHttpClient>, String>>()?;
|
||||
|
||||
let num_nodes = beacon_nodes.len();
|
||||
@@ -324,6 +336,12 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
.map(CandidateBeaconNode::new)
|
||||
.collect();
|
||||
|
||||
let proposer_nodes_num = proposer_nodes.len();
|
||||
let proposer_candidates = proposer_nodes
|
||||
.into_iter()
|
||||
.map(CandidateBeaconNode::new)
|
||||
.collect();
|
||||
|
||||
// Set the count for beacon node fallbacks excluding the primary beacon node.
|
||||
set_gauge(
|
||||
&http_metrics::metrics::ETH2_FALLBACK_CONFIGURED,
|
||||
@@ -348,9 +366,16 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
log.clone(),
|
||||
);
|
||||
|
||||
let mut proposer_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new(
|
||||
proposer_candidates,
|
||||
config.disable_run_on_all,
|
||||
context.eth2_config.spec.clone(),
|
||||
log.clone(),
|
||||
);
|
||||
|
||||
// Perform some potentially long-running initialization tasks.
|
||||
let (genesis_time, genesis_validators_root) = tokio::select! {
|
||||
tuple = init_from_beacon_node(&beacon_nodes, &context) => tuple?,
|
||||
tuple = init_from_beacon_node(&beacon_nodes, &proposer_nodes, &context) => tuple?,
|
||||
() = context.executor.exit() => return Err("Shutting down".to_string())
|
||||
};
|
||||
|
||||
@@ -366,9 +391,14 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
);
|
||||
|
||||
beacon_nodes.set_slot_clock(slot_clock.clone());
|
||||
proposer_nodes.set_slot_clock(slot_clock.clone());
|
||||
|
||||
let beacon_nodes = Arc::new(beacon_nodes);
|
||||
start_fallback_updater_service(context.clone(), beacon_nodes.clone())?;
|
||||
|
||||
let proposer_nodes = Arc::new(proposer_nodes);
|
||||
start_fallback_updater_service(context.clone(), proposer_nodes.clone())?;
|
||||
|
||||
let doppelganger_service = if config.enable_doppelganger_protection {
|
||||
Some(Arc::new(DoppelgangerService::new(
|
||||
context
|
||||
@@ -432,15 +462,21 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
ctx.shared.write().duties_service = Some(duties_service.clone());
|
||||
}
|
||||
|
||||
let block_service = BlockServiceBuilder::new()
|
||||
let mut block_service_builder = BlockServiceBuilder::new()
|
||||
.slot_clock(slot_clock.clone())
|
||||
.validator_store(validator_store.clone())
|
||||
.beacon_nodes(beacon_nodes.clone())
|
||||
.runtime_context(context.service_context("block".into()))
|
||||
.graffiti(config.graffiti)
|
||||
.graffiti_file(config.graffiti_file.clone())
|
||||
.block_delay(config.block_delay)
|
||||
.build()?;
|
||||
.block_delay(config.block_delay);
|
||||
|
||||
// If we have proposer nodes, add them to the block service builder.
|
||||
if proposer_nodes_num > 0 {
|
||||
block_service_builder = block_service_builder.proposer_nodes(proposer_nodes.clone());
|
||||
}
|
||||
|
||||
let block_service = block_service_builder.build()?;
|
||||
|
||||
let attestation_service = AttestationServiceBuilder::new()
|
||||
.duties_service(duties_service.clone())
|
||||
@@ -461,7 +497,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
let sync_committee_service = SyncCommitteeService::new(
|
||||
duties_service.clone(),
|
||||
validator_store.clone(),
|
||||
slot_clock,
|
||||
slot_clock.clone(),
|
||||
beacon_nodes.clone(),
|
||||
context.service_context("sync_committee".into()),
|
||||
);
|
||||
@@ -482,6 +518,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
preparation_service,
|
||||
validator_store,
|
||||
config,
|
||||
slot_clock,
|
||||
http_api_listen_addr: None,
|
||||
})
|
||||
}
|
||||
@@ -545,6 +582,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
graffiti_flag: self.config.graffiti,
|
||||
spec: self.context.eth2_config.spec.clone(),
|
||||
config: self.config.http_api.clone(),
|
||||
slot_clock: self.slot_clock.clone(),
|
||||
log: log.clone(),
|
||||
_phantom: PhantomData,
|
||||
});
|
||||
@@ -579,13 +617,32 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
|
||||
async fn init_from_beacon_node<E: EthSpec>(
|
||||
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
|
||||
proposer_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
|
||||
context: &RuntimeContext<E>,
|
||||
) -> Result<(u64, Hash256), String> {
|
||||
loop {
|
||||
beacon_nodes.update_unready_candidates().await;
|
||||
proposer_nodes.update_unready_candidates().await;
|
||||
|
||||
let num_available = beacon_nodes.num_available().await;
|
||||
let num_total = beacon_nodes.num_total();
|
||||
if num_available > 0 {
|
||||
|
||||
let proposer_available = beacon_nodes.num_available().await;
|
||||
let proposer_total = beacon_nodes.num_total();
|
||||
|
||||
if proposer_total > 0 && proposer_available == 0 {
|
||||
warn!(
|
||||
context.log(),
|
||||
"Unable to connect to a proposer node";
|
||||
"retry in" => format!("{} seconds", RETRY_DELAY.as_secs()),
|
||||
"total_proposers" => proposer_total,
|
||||
"available_proposers" => proposer_available,
|
||||
"total_beacon_nodes" => num_total,
|
||||
"available_beacon_nodes" => num_available,
|
||||
);
|
||||
}
|
||||
|
||||
if num_available > 0 && proposer_available == 0 {
|
||||
info!(
|
||||
context.log(),
|
||||
"Initialized beacon node connections";
|
||||
@@ -593,6 +650,16 @@ async fn init_from_beacon_node<E: EthSpec>(
|
||||
"available" => num_available,
|
||||
);
|
||||
break;
|
||||
} else if num_available > 0 {
|
||||
info!(
|
||||
context.log(),
|
||||
"Initialized beacon node connections";
|
||||
"total" => num_total,
|
||||
"available" => num_available,
|
||||
"proposers_available" => proposer_available,
|
||||
"proposers_total" => proposer_total,
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
warn!(
|
||||
context.log(),
|
||||
|
||||
Reference in New Issue
Block a user