mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 01:05:47 +00:00
Add comments
This commit is contained in:
@@ -990,6 +990,9 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
// Spawn the background task to compute selection proofs.
|
// Spawn the background task to compute selection proofs.
|
||||||
let subservice = duties_service.clone();
|
let subservice = duties_service.clone();
|
||||||
|
|
||||||
|
// Define a config to be pass to fill_in_selection_proofs.
|
||||||
|
// The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode)
|
||||||
|
// Other DVT applications, e.g., Anchor can pass in different configs to suit different needs.
|
||||||
let config = SelectionProofConfig {
|
let config = SelectionProofConfig {
|
||||||
lookahead_slot: if duties_service.distributed {
|
lookahead_slot: if duties_service.distributed {
|
||||||
SELECTION_PROOF_SLOT_LOOKAHEAD_DVT
|
SELECTION_PROOF_SLOT_LOOKAHEAD_DVT
|
||||||
@@ -1091,6 +1094,7 @@ async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
|
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a helper function here to reduce code duplication for normal and distributed mode
|
||||||
fn process_duty_and_proof<E: EthSpec>(
|
fn process_duty_and_proof<E: EthSpec>(
|
||||||
attesters: &mut RwLockWriteGuard<AttesterMap>,
|
attesters: &mut RwLockWriteGuard<AttesterMap>,
|
||||||
result: Result<(AttesterData, Option<SelectionProof>), Error>,
|
result: Result<(AttesterData, Option<SelectionProof>), Error>,
|
||||||
@@ -1107,6 +1111,7 @@ fn process_duty_and_proof<E: EthSpec>(
|
|||||||
"Missing pubkey for duty and proof"
|
"Missing pubkey for duty and proof"
|
||||||
);
|
);
|
||||||
// Do not abort the entire batch for a single failure.
|
// Do not abort the entire batch for a single failure.
|
||||||
|
// return true means continue processing duties.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -1157,6 +1162,7 @@ fn process_duty_and_proof<E: EthSpec>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
|
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
|
||||||
///
|
///
|
||||||
/// Duties are computed in batches each slot. If a re-org is detected then the process will
|
/// Duties are computed in batches each slot. If a re-org is detected then the process will
|
||||||
@@ -1190,8 +1196,8 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
|
|
||||||
let lookahead_slot = current_slot + selection_lookahead;
|
let lookahead_slot = current_slot + selection_lookahead;
|
||||||
|
|
||||||
let relevant_duties = if duties_service.distributed {
|
let relevant_duties = if config.selections_endpoint {
|
||||||
// Remove old slot duties and only keep current duties
|
// Remove old slot duties and only keep current duties in distributed mode
|
||||||
duties_by_slot
|
duties_by_slot
|
||||||
.remove(&lookahead_slot)
|
.remove(&lookahead_slot)
|
||||||
.map(|duties| BTreeMap::from([(lookahead_slot, duties)]))
|
.map(|duties| BTreeMap::from([(lookahead_slot, duties)]))
|
||||||
@@ -1235,6 +1241,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
|
|
||||||
while let Some(result) = duty_and_proof_results.next().await {
|
while let Some(result) = duty_and_proof_results.next().await {
|
||||||
let mut attesters = duties_service.attesters.write();
|
let mut attesters = duties_service.attesters.write();
|
||||||
|
// if process_duty_and_proof returns false, exit the loop
|
||||||
if !process_duty_and_proof::<E>(
|
if !process_duty_and_proof::<E>(
|
||||||
&mut attesters,
|
&mut attesters,
|
||||||
result,
|
result,
|
||||||
@@ -1245,6 +1252,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// In normal (non-distributed case), sign selection proofs serially
|
||||||
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
|
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
|
||||||
.then(|duty| async {
|
.then(|duty| async {
|
||||||
let opt_selection_proof = make_selection_proof(
|
let opt_selection_proof = make_selection_proof(
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use crate::duties_service::{DutiesService, Error, SelectionProofConfig};
|
use crate::duties_service::{DutiesService, Error, SelectionProofConfig};
|
||||||
use beacon_node_fallback::BeaconNodeFallback;
|
// use beacon_node_fallback::BeaconNodeFallback;
|
||||||
use doppelganger_service::DoppelgangerStatus;
|
use doppelganger_service::DoppelgangerStatus;
|
||||||
use eth2::types::{Signature, SyncCommitteeSelection};
|
use eth2::types::{Signature, SyncCommitteeSelection};
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
@@ -352,10 +352,12 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
let sub_duties_service = duties_service.clone();
|
let sub_duties_service = duties_service.clone();
|
||||||
duties_service.context.executor.spawn(
|
duties_service.context.executor.spawn(
|
||||||
async move {
|
async move {
|
||||||
|
// The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode)
|
||||||
|
// Other DVT applications, e.g., Anchor can pass in different configs to suit different needs.
|
||||||
let config = SelectionProofConfig {
|
let config = SelectionProofConfig {
|
||||||
lookahead_slot: sub_duties_service
|
lookahead_slot: sub_duties_service
|
||||||
.sync_duties
|
.sync_duties
|
||||||
.aggregation_pre_compute_slots(),
|
.aggregation_pre_compute_slots(), // Use the current behaviour defined in the method
|
||||||
computation_offset: Duration::from_secs(0),
|
computation_offset: Duration::from_secs(0),
|
||||||
selections_endpoint: sub_duties_service.sync_duties.distributed,
|
selections_endpoint: sub_duties_service.sync_duties.distributed,
|
||||||
parallel_sign: sub_duties_service.sync_duties.distributed,
|
parallel_sign: sub_duties_service.sync_duties.distributed,
|
||||||
@@ -520,13 +522,13 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a helper function here to reduce code duplication for normal and distributed mode
|
||||||
pub async fn make_sync_selection_proof<T: SlotClock + 'static, E: EthSpec>(
|
pub async fn make_sync_selection_proof<T: SlotClock + 'static, E: EthSpec>(
|
||||||
duties_service: &Arc<DutiesService<T, E>>,
|
duties_service: &Arc<DutiesService<T, E>>,
|
||||||
duty: &SyncDuty,
|
duty: &SyncDuty,
|
||||||
proof_slot: Slot,
|
proof_slot: Slot,
|
||||||
subnet_id: SyncSubnetId,
|
subnet_id: SyncSubnetId,
|
||||||
config: &SelectionProofConfig,
|
config: &SelectionProofConfig,
|
||||||
_beacon_nodes: &Arc<BeaconNodeFallback<T, E>>,
|
|
||||||
) -> Option<SyncSelectionProof> {
|
) -> Option<SyncSelectionProof> {
|
||||||
let sync_selection_proof = duties_service
|
let sync_selection_proof = duties_service
|
||||||
.validator_store
|
.validator_store
|
||||||
@@ -554,12 +556,13 @@ pub async fn make_sync_selection_proof<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// In --distributed mode when we want to call the selections endpoint
|
// In distributed mode when we want to call the selections endpoint
|
||||||
if config.selections_endpoint {
|
if config.selections_endpoint {
|
||||||
debug!(
|
debug!(
|
||||||
"validator_index" = duty.validator_index,
|
"validator_index" = duty.validator_index,
|
||||||
"slot" = %proof_slot,
|
"slot" = %proof_slot,
|
||||||
"subcommittee_index" = *subnet_id,
|
"subcommittee_index" = *subnet_id,
|
||||||
|
// In distributed mode, this is partial selection proof
|
||||||
"partial selection proof" = ?Signature::from(selection_proof.clone()),
|
"partial selection proof" = ?Signature::from(selection_proof.clone()),
|
||||||
"Sending sync selection to middleware"
|
"Sending sync selection to middleware"
|
||||||
);
|
);
|
||||||
@@ -589,16 +592,16 @@ pub async fn make_sync_selection_proof<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
match middleware_response {
|
match middleware_response {
|
||||||
Ok(response) => {
|
Ok(response) => {
|
||||||
let response_data = &response.data[0];
|
let response_data = &response.data[0];
|
||||||
// The selection proof from middleware response will be a full selection proof
|
|
||||||
debug!(
|
debug!(
|
||||||
"validator_index" = response_data.validator_index,
|
"validator_index" = response_data.validator_index,
|
||||||
"slot" = %response_data.slot,
|
"slot" = %response_data.slot,
|
||||||
"subcommittee_index" = response_data.subcommittee_index,
|
"subcommittee_index" = response_data.subcommittee_index,
|
||||||
|
// The selection proof from middleware response will be a full selection proof
|
||||||
"full selection proof" = ?response_data.selection_proof,
|
"full selection proof" = ?response_data.selection_proof,
|
||||||
"Received sync selection from middleware"
|
"Received sync selection from middleware"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Convert the response to a SyncSelectionProof so we can call the is_aggregator method
|
// Convert the response to a SyncSelectionProof
|
||||||
let full_selection_proof =
|
let full_selection_proof =
|
||||||
SyncSelectionProof::from(response_data.selection_proof.clone());
|
SyncSelectionProof::from(response_data.selection_proof.clone());
|
||||||
Some(full_selection_proof)
|
Some(full_selection_proof)
|
||||||
@@ -613,7 +616,7 @@ pub async fn make_sync_selection_proof<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// When calling the selections endpoint is not required, return the selection_proof
|
// When calling the selections endpoint is not required, the selection_proof is already a full selection proof
|
||||||
Some(selection_proof)
|
Some(selection_proof)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -648,7 +651,7 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
// Construct proof for prior slot.
|
// Construct proof for prior slot.
|
||||||
let proof_slot = slot - 1;
|
let proof_slot = slot - 1;
|
||||||
|
|
||||||
// Store all partial sync selection proofs so that it can be sent together later
|
// Calling the make_sync_selection_proof will return a full selection proof
|
||||||
for &subnet_id in &subnet_ids {
|
for &subnet_id in &subnet_ids {
|
||||||
let duties_service = duties_service.clone();
|
let duties_service = duties_service.clone();
|
||||||
futures_unordered.push(async move {
|
futures_unordered.push(async move {
|
||||||
@@ -658,7 +661,6 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
proof_slot,
|
proof_slot,
|
||||||
subnet_id,
|
subnet_id,
|
||||||
config_ref,
|
config_ref,
|
||||||
&duties_service.beacon_nodes,
|
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -749,7 +751,6 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
proof_slot,
|
proof_slot,
|
||||||
*subnet_id,
|
*subnet_id,
|
||||||
config_ref,
|
config_ref,
|
||||||
&duties_service_ref.beacon_nodes,
|
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user