diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 82c1a81e33..7f3ca3b0aa 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -146,6 +146,28 @@ jobs: run: sudo npm install -g ganache-cli - name: Run the syncing simulator run: cargo run --release --bin simulator syncing-sim + doppelganger-protection-test: + name: doppelganger-protection-test + runs-on: ubuntu-latest + needs: cargo-fmt + steps: + - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable + - name: Install ganache-cli + run: sudo npm install -g ganache-cli + - name: Install lighthouse and lcli + run: | + make + make install-lcli + - name: Run the doppelganger protection success test script + run: | + cd scripts/tests + ./doppelganger_protection.sh success + - name: Run the doppelganger protection failure test script + run: | + cd scripts/tests + ./doppelganger_protection.sh failure check-benchmarks: name: check-benchmarks runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 6d7a1a833a..8cb2cfeba9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7336,6 +7336,7 @@ dependencies = [ "slog-async", "slog-term", "slot_clock", + "task_executor", "tempfile", "tokio 1.8.1", "tree_hash", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 750fefd6c6..aeb30bf2e6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3443,6 +3443,28 @@ impl BeaconChain { let mut file = std::fs::File::create(file_name).unwrap(); self.dump_as_dot(&mut file); } + + /// Checks if attestations have been seen from the given `validator_index` at the + /// given `epoch`. + pub fn validator_seen_at_epoch(&self, validator_index: usize, epoch: Epoch) -> bool { + // It's necessary to assign these checks to intermediate variables to avoid a deadlock. + // + // See: https://github.com/sigp/lighthouse/pull/2230#discussion_r620013993 + let attested = self + .observed_attesters + .read() + .index_seen_at_epoch(validator_index, epoch); + let aggregated = self + .observed_aggregators + .read() + .index_seen_at_epoch(validator_index, epoch); + let produced_block = self + .observed_block_producers + .read() + .index_seen_at_epoch(validator_index as u64, epoch); + + attested || aggregated || produced_block + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/observed_attesters.rs b/beacon_node/beacon_chain/src/observed_attesters.rs index 72892af6a8..043105992d 100644 --- a/beacon_node/beacon_chain/src/observed_attesters.rs +++ b/beacon_node/beacon_chain/src/observed_attesters.rs @@ -381,6 +381,16 @@ impl AutoPruningEpochContainer { pub(crate) fn get_lowest_permissible(&self) -> Epoch { self.lowest_permissible_epoch } + + /// Returns `true` if the given `index` has been stored in `self` at `epoch`. + /// + /// This is useful for doppelganger detection. + pub fn index_seen_at_epoch(&self, index: usize, epoch: Epoch) -> bool { + self.items + .get(&epoch) + .map(|item| item.contains(index)) + .unwrap_or(false) + } } /// A container that stores some number of `V` items. diff --git a/beacon_node/beacon_chain/src/observed_block_producers.rs b/beacon_node/beacon_chain/src/observed_block_producers.rs index 66e036f36f..b5995121b9 100644 --- a/beacon_node/beacon_chain/src/observed_block_producers.rs +++ b/beacon_node/beacon_chain/src/observed_block_producers.rs @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; -use types::{BeaconBlockRef, EthSpec, Slot, Unsigned}; +use types::{BeaconBlockRef, Epoch, EthSpec, Slot, Unsigned}; #[derive(Debug, PartialEq)] pub enum Error { @@ -114,6 +114,15 @@ impl ObservedBlockProducers { self.finalized_slot = finalized_slot; self.items.retain(|slot, _set| *slot > finalized_slot); } + + /// Returns `true` if the given `validator_index` has been stored in `self` at `epoch`. + /// + /// This is useful for doppelganger detection. + pub fn index_seen_at_epoch(&self, validator_index: u64, epoch: Epoch) -> bool { + self.items.iter().any(|(slot, producers)| { + slot.epoch(E::slots_per_epoch()) == epoch && producers.contains(&validator_index) + }) + } } #[cfg(test)] diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 00e4854c7b..998b95bea9 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1907,6 +1907,49 @@ pub fn serve( }, ); + // POST lighthouse/liveness + let post_lighthouse_liveness = warp::path("lighthouse") + .and(warp::path("liveness")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(chain_filter.clone()) + .and_then( + |request_data: api_types::LivenessRequestData, chain: Arc>| { + blocking_json_task(move || { + // Ensure the request is for either the current, previous or next epoch. + let current_epoch = chain + .epoch() + .map_err(warp_utils::reject::beacon_chain_error)?; + let prev_epoch = current_epoch.saturating_sub(Epoch::new(1)); + let next_epoch = current_epoch.saturating_add(Epoch::new(1)); + + if request_data.epoch < prev_epoch || request_data.epoch > next_epoch { + return Err(warp_utils::reject::custom_bad_request(format!( + "request epoch {} is more than one epoch from the current epoch {}", + request_data.epoch, current_epoch + ))); + } + + let liveness: Vec = request_data + .indices + .iter() + .cloned() + .map(|index| { + let is_live = + chain.validator_seen_at_epoch(index as usize, request_data.epoch); + api_types::LivenessResponseData { + index: index as u64, + epoch: request_data.epoch, + is_live, + } + }) + .collect(); + + Ok(api_types::GenericResponse::from(liveness)) + }) + }, + ); + // GET lighthouse/health let get_lighthouse_health = warp::path("lighthouse") .and(warp::path("health")) @@ -2249,6 +2292,7 @@ pub fn serve( .or(post_beacon_pool_voluntary_exits.boxed()) .or(post_validator_duties_attester.boxed()) .or(post_validator_aggregate_and_proofs.boxed()) + .or(post_lighthouse_liveness.boxed()) .or(post_validator_beacon_committee_subscriptions.boxed()), )) .recover(warp_utils::reject::handle_rejection) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 205b638c10..470afbf092 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -2149,6 +2149,71 @@ impl ApiTester { self } + pub async fn test_post_lighthouse_liveness(self) -> Self { + let epoch = self.chain.epoch().unwrap(); + let head_state = self.chain.head_beacon_state().unwrap(); + let indices = (0..head_state.validators().len()) + .map(|i| i as u64) + .collect::>(); + + // Construct the expected response + let expected: Vec = head_state + .validators() + .iter() + .enumerate() + .map(|(index, _)| LivenessResponseData { + index: index as u64, + is_live: false, + epoch, + }) + .collect(); + + let result = self + .client + .post_lighthouse_liveness(indices.as_slice(), epoch) + .await + .unwrap() + .data; + + assert_eq!(result, expected); + + // Attest to the current slot + self.client + .post_beacon_pool_attestations(self.attestations.as_slice()) + .await + .unwrap(); + + let result = self + .client + .post_lighthouse_liveness(indices.as_slice(), epoch) + .await + .unwrap() + .data; + + let committees = head_state + .get_beacon_committees_at_slot(self.chain.slot().unwrap()) + .unwrap(); + let attesting_validators: Vec = committees + .into_iter() + .map(|committee| committee.committee.iter().cloned()) + .flatten() + .collect(); + // All attesters should now be considered live + let expected = expected + .into_iter() + .map(|mut a| { + if attesting_validators.contains(&(a.index as usize)) { + a.is_live = true; + } + a + }) + .collect::>(); + + assert_eq!(result, expected); + + self + } + pub async fn test_get_events(self) -> Self { // Subscribe to all events let topics = vec![ @@ -2635,5 +2700,7 @@ async fn lighthouse_endpoints() { .test_get_lighthouse_beacon_states_ssz() .await .test_get_lighthouse_staking() + .await + .test_post_lighthouse_liveness() .await; } diff --git a/book/src/api-lighthouse.md b/book/src/api-lighthouse.md index dab55fb470..bc19d18281 100644 --- a/book/src/api-lighthouse.md +++ b/book/src/api-lighthouse.md @@ -329,3 +329,28 @@ curl -X GET "http://localhost:5052/lighthouse/beacon/states/0/ssz" | jq ``` *Example omitted for brevity, the body simply contains SSZ bytes.* + +### `/lighthouse/liveness` + +POST request that checks if any of the given validators have attested in the given epoch. Returns a list +of objects, each including the validator index, epoch, and `is_live` status of a requested validator. + +This endpoint is used in doppelganger detection, and will only provide accurate information for the +current, previous, or next epoch. + + +```bash +curl -X POST "http://localhost:5052/lighthouse/liveness" -d '{"indices":["0","1"],"epoch":"1"}' -H "content-type: application/json" | jq +``` + +```json +{ + "data": [ + { + "index": "0", + "epoch": "1", + "is_live": true + } + ] +} +``` \ No newline at end of file diff --git a/book/src/api-vc-endpoints.md b/book/src/api-vc-endpoints.md index 033f5d7657..58ad76ce5a 100644 --- a/book/src/api-vc-endpoints.md +++ b/book/src/api-vc-endpoints.md @@ -351,7 +351,7 @@ Typical Responses | 200 "checksum": { "function": "sha256", "params": { - + }, "message": "abadc1285fd38b24a98ac586bda5b17a8f93fc1ff0778803dc32049578981236" }, diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index ea4f013a1e..08b4bcaead 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -83,6 +83,7 @@ impl fmt::Display for Error { pub struct Timeouts { pub attestation: Duration, pub attester_duties: Duration, + pub liveness: Duration, pub proposal: Duration, pub proposer_duties: Duration, } @@ -92,6 +93,7 @@ impl Timeouts { Timeouts { attestation: timeout, attester_duties: timeout, + liveness: timeout, proposal: timeout, proposer_duties: timeout, } @@ -1103,6 +1105,30 @@ impl BeaconNodeHttpClient { .await } + /// `POST lighthouse/liveness` + pub async fn post_lighthouse_liveness( + &self, + ids: &[u64], + epoch: Epoch, + ) -> Result>, Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("liveness"); + + self.post_with_timeout_and_response( + path, + &LivenessRequestData { + indices: ids.to_vec(), + epoch, + }, + self.timeouts.liveness, + ) + .await + } + /// `POST validator/duties/attester/{epoch}` pub async fn post_validator_duties_attester( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index d31407645d..264f6c5870 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -847,6 +847,21 @@ impl FromStr for Accept { } } +#[derive(Debug, Serialize, Deserialize)] +pub struct LivenessRequestData { + pub epoch: Epoch, + #[serde(with = "serde_utils::quoted_u64_vec")] + pub indices: Vec, +} + +#[derive(PartialEq, Debug, Serialize, Deserialize)] +pub struct LivenessResponseData { + #[serde(with = "serde_utils::quoted_u64")] + pub index: u64, + pub epoch: Epoch, + pub is_live: bool, +} + #[cfg(test)] mod tests { use super::*; diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 48ca0338dd..adc0e04102 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -344,6 +344,13 @@ fn main() { non-default.", ), ) + .arg( + Arg::with_name("seconds-per-slot") + .long("seconds-per-slot") + .value_name("SECONDS") + .takes_value(true) + .help("Eth2 slot time"), + ) .arg( Arg::with_name("seconds-per-eth1-block") .long("seconds-per-eth1-block") diff --git a/lcli/src/new_testnet.rs b/lcli/src/new_testnet.rs index 777633ca82..e37145bf0d 100644 --- a/lcli/src/new_testnet.rs +++ b/lcli/src/new_testnet.rs @@ -43,6 +43,7 @@ pub fn run(testnet_dir_path: PathBuf, matches: &ArgMatches) -> Resul maybe_update!("genesis-delay", genesis_delay); maybe_update!("eth1-id", deposit_chain_id); maybe_update!("eth1-id", deposit_network_id); + maybe_update!("seconds-per-slot", seconds_per_slot); maybe_update!("seconds-per-eth1-block", seconds_per_eth1_block); if let Some(v) = parse_ssz_optional(matches, "genesis-fork-version")? { diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index ac255dffa0..0fde9b901d 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -378,3 +378,16 @@ pub fn malloc_tuning_flag() { // effects of it. .run(); } +#[test] +fn doppelganger_protection_flag() { + CommandLineTest::new() + .flag("enable-doppelganger-protection", None) + .run() + .with_config(|config| assert!(config.enable_doppelganger_protection)); +} +#[test] +fn no_doppelganger_protection_flag() { + CommandLineTest::new() + .run() + .with_config(|config| assert!(!config.enable_doppelganger_protection)); +} diff --git a/scripts/local_testnet/README.md b/scripts/local_testnet/README.md index 6727e2d28c..5cb8407a98 100644 --- a/scripts/local_testnet/README.md +++ b/scripts/local_testnet/README.md @@ -20,7 +20,7 @@ Start a local eth1 ganache server ./ganache_test_node.sh ``` -Assuming you are happy with the configuration in `var.env`, deploy the deposit contract, make deposits, +Assuming you are happy with the configuration in `var.env`, deploy the deposit contract, make deposits, create the testnet directory, genesis state and validator keys with: ```bash diff --git a/scripts/local_testnet/bootnode.sh b/scripts/local_testnet/bootnode.sh index 7ade1f0db5..9558a487bc 100755 --- a/scripts/local_testnet/bootnode.sh +++ b/scripts/local_testnet/bootnode.sh @@ -30,4 +30,4 @@ exec lighthouse boot_node \ --testnet-dir $TESTNET_DIR \ --port $BOOTNODE_PORT \ --listen-address 127.0.0.1 \ - --network-dir $DATADIR/bootnode \ \ No newline at end of file + --network-dir $DATADIR/bootnode \ diff --git a/scripts/local_testnet/ganache_test_node.sh b/scripts/local_testnet/ganache_test_node.sh index 43a0e0e5dc..762700dbd6 100755 --- a/scripts/local_testnet/ganache_test_node.sh +++ b/scripts/local_testnet/ganache_test_node.sh @@ -2,12 +2,12 @@ source ./vars.env -ganache-cli \ +exec ganache-cli \ --defaultBalanceEther 1000000000 \ --gasLimit 1000000000 \ --accounts 10 \ --mnemonic "$ETH1_NETWORK_MNEMONIC" \ --port 8545 \ - --blockTime 3 \ + --blockTime $SECONDS_PER_ETH1_BLOCK \ --networkId "$NETWORK_ID" \ --chainId "$NETWORK_ID" diff --git a/scripts/local_testnet/setup.sh b/scripts/local_testnet/setup.sh index 4e86ec8806..a171fb1b08 100755 --- a/scripts/local_testnet/setup.sh +++ b/scripts/local_testnet/setup.sh @@ -33,7 +33,8 @@ lcli \ --altair-fork-epoch $ALTAIR_FORK_EPOCH \ --eth1-id $NETWORK_ID \ --eth1-follow-distance 1 \ - --seconds-per-eth1-block 1 \ + --seconds-per-slot $SECONDS_PER_SLOT \ + --seconds-per-eth1-block $SECONDS_PER_ETH1_BLOCK \ --force echo Specification generated at $TESTNET_DIR. diff --git a/scripts/local_testnet/validator_client.sh b/scripts/local_testnet/validator_client.sh index 98629b4b3b..6755384be5 100755 --- a/scripts/local_testnet/validator_client.sh +++ b/scripts/local_testnet/validator_client.sh @@ -16,4 +16,5 @@ exec lighthouse \ --datadir $1 \ --testnet-dir $TESTNET_DIR \ --init-slashing-protection \ - --beacon-nodes $2 + --beacon-nodes $2 \ + $VC_ARGS diff --git a/scripts/local_testnet/vars.env b/scripts/local_testnet/vars.env index 3152dd49f4..5c2ed22bd6 100644 --- a/scripts/local_testnet/vars.env +++ b/scripts/local_testnet/vars.env @@ -28,3 +28,9 @@ NETWORK_ID=4242 # Hard fork configuration ALTAIR_FORK_EPOCH=18446744073709551615 + +# Seconds per Eth2 slot +SECONDS_PER_SLOT=3 + +# Seconds per Eth1 block +SECONDS_PER_ETH1_BLOCK=1 diff --git a/scripts/tests/doppelganger_protection.sh b/scripts/tests/doppelganger_protection.sh new file mode 100755 index 0000000000..d10249bdca --- /dev/null +++ b/scripts/tests/doppelganger_protection.sh @@ -0,0 +1,141 @@ +#!/usr/bin/env bash + +# Requires `lighthouse`, ``lcli`, `ganache-cli`, `curl`, `jq` + +BEHAVIOR=$1 + +if [[ "$BEHAVIOR" != "success" ]] && [[ "$BEHAVIOR" != "failure" ]]; then + echo "Usage: doppelganger_protection.sh [success|failure]" + exit 1 +fi + +source ./vars.env + +../local_testnet/clean.sh + +echo "Starting ganache" + +../local_testnet/ganache_test_node.sh &> /dev/null & +GANACHE_PID=$! + +# Wait for ganache to start +sleep 5 + +echo "Setting up local testnet" + +../local_testnet/setup.sh + +# Duplicate this directory so slashing protection doesn't keep us from re-using validator keys +cp -R $HOME/.lighthouse/local-testnet/node_1 $HOME/.lighthouse/local-testnet/node_1_doppelganger + +echo "Starting bootnode" + +../local_testnet/bootnode.sh &> /dev/null & +BOOT_PID=$! + +# wait for the bootnode to start +sleep 10 + +echo "Starting local beacon nodes" + +../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_1 9000 8000 &> /dev/null & +BEACON_PID=$! +../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_2 9100 8100 &> /dev/null & +BEACON_PID2=$! +../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_3 9200 8200 &> /dev/null & +BEACON_PID3=$! + +echo "Starting local validator clients" + +../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_1 http://localhost:8000 &> /dev/null & +VALIDATOR_1_PID=$! +../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_2 http://localhost:8100 &> /dev/null & +VALIDATOR_2_PID=$! +../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_3 http://localhost:8200 &> /dev/null & +VALIDATOR_3_PID=$! + +echo "Waiting an epoch before starting the next validator client" +sleep $(( $SECONDS_PER_SLOT * 32 )) + +if [[ "$BEHAVIOR" == "failure" ]]; then + + echo "Starting the doppelganger validator client" + + # Use same keys as keys from VC1, but connect to BN2 + # This process should not last longer than 2 epochs + timeout $(( $SECONDS_PER_SLOT * 32 * 2 )) ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_1_doppelganger http://localhost:8100 + DOPPELGANGER_EXIT=$? + + echo "Shutting down" + + # Cleanup + kill $BOOT_PID $BEACON_PID $BEACON_PID2 $BEACON_PID3 $GANACHE_PID $VALIDATOR_1_PID $VALIDATOR_2_PID $VALIDATOR_3_PID + + echo "Done" + + if [[ $DOPPELGANGER_EXIT -eq 124 ]]; then + exit 1 + fi +fi + +if [[ "$BEHAVIOR" == "success" ]]; then + + echo "Starting the last validator client" + + ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_4 http://localhost:8100 & + VALIDATOR_4_PID=$! + DOPPELGANGER_FAILURE=0 + + # Sleep three epochs, then make sure all validators were active in epoch 2. Use + # `is_previous_epoch_target_attester` from epoch 3 for a complete view of epoch 2 inclusion. + # + # See: https://lighthouse-book.sigmaprime.io/validator-inclusion.html + echo "Waiting three epochs..." + sleep $(( $SECONDS_PER_SLOT * 32 * 3 )) + + PREVIOUS_DIR=$(pwd) + cd $HOME/.lighthouse/local-testnet/node_4/validators + for val in 0x*; do + [[ -e $val ]] || continue + curl -s localhost:8100/lighthouse/validator_inclusion/3/$val | jq | grep -q '"is_previous_epoch_target_attester": false' + IS_ATTESTER=$? + if [[ $IS_ATTESTER -eq 0 ]]; then + echo "$val did not attest in epoch 2." + else + echo "ERROR! $val did attest in epoch 2." + DOPPELGANGER_FAILURE=1 + fi + done + + # Sleep two epochs, then make sure all validators were active in epoch 4. Use + # `is_previous_epoch_target_attester` from epoch 5 for a complete view of epoch 4 inclusion. + # + # See: https://lighthouse-book.sigmaprime.io/validator-inclusion.html + echo "Waiting two more epochs..." + sleep $(( $SECONDS_PER_SLOT * 32 * 2 )) + for val in 0x*; do + [[ -e $val ]] || continue + curl -s localhost:8100/lighthouse/validator_inclusion/5/$val | jq | grep -q '"is_previous_epoch_target_attester": true' + IS_ATTESTER=$? + if [[ $IS_ATTESTER -eq 0 ]]; then + echo "$val attested in epoch 4." + else + echo "ERROR! $val did not attest in epoch 4." + DOPPELGANGER_FAILURE=1 + fi + done + + echo "Shutting down" + + # Cleanup + cd $PREVIOUS_DIR + kill $BOOT_PID $BEACON_PID $BEACON_PID2 $BEACON_PID3 $GANACHE_PID $VALIDATOR_1_PID $VALIDATOR_2_PID $VALIDATOR_3_PID $VALIDATOR_4_PID + + echo "Done" + + if [[ $DOPPELGANGER_FAILURE -eq 1 ]]; then + exit 1 + fi +fi + +exit 0 diff --git a/scripts/tests/vars.env b/scripts/tests/vars.env new file mode 100644 index 0000000000..7e11393035 --- /dev/null +++ b/scripts/tests/vars.env @@ -0,0 +1,39 @@ +# Base directories for the validator keys and secrets +DATADIR=~/.lighthouse/local-testnet + +# Directory for the eth2 config +TESTNET_DIR=$DATADIR/testnet + +# Mnemonic for the ganache test network +ETH1_NETWORK_MNEMONIC="vast thought differ pull jewel broom cook wrist tribe word before omit" + +# Hardcoded deposit contract based on ETH1_NETWORK_MNEMONIC +DEPOSIT_CONTRACT_ADDRESS=8c594691c0e592ffa21f153a16ae41db5befcaaa + +GENESIS_FORK_VERSION=0x42424242 + +VALIDATOR_COUNT=80 +GENESIS_VALIDATOR_COUNT=80 + +# Number of validator client instances that you intend to run +NODE_COUNT=4 + +GENESIS_DELAY=0 + +# Port for P2P communication with bootnode +BOOTNODE_PORT=4242 + +# Network ID and Chain ID of local eth1 test network +NETWORK_ID=4242 + +# Hard fork configuration +ALTAIR_FORK_EPOCH=18446744073709551615 + +# Seconds per Eth2 slot +SECONDS_PER_SLOT=3 + +# Seconds per Eth1 block +SECONDS_PER_ETH1_BLOCK=1 + +# Enable doppelganger detection +VC_ARGS=" --enable-doppelganger-protection " diff --git a/testing/simulator/src/no_eth1_sim.rs b/testing/simulator/src/no_eth1_sim.rs index 09ab132bc3..2eda987d49 100644 --- a/testing/simulator/src/no_eth1_sim.rs +++ b/testing/simulator/src/no_eth1_sim.rs @@ -125,7 +125,7 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> { network.clone(), Epoch::new(4).start_slot(MainnetEthSpec::slots_per_epoch()), slot_duration, - ) + ), ); finalization?; block_prod?; diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 792255cf07..801a0954b0 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -66,3 +66,4 @@ lazy_static = "1.4.0" fallback = { path = "../common/fallback" } monitoring_api = { path = "../common/monitoring_api" } sensitive_url = { path = "../common/sensitive_url" } +task_executor = { path = "../common/task_executor" } diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index af016f7cf4..50f127db5b 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -20,7 +20,7 @@ use types::{ /// Builds an `AttestationService`. pub struct AttestationServiceBuilder { duties_service: Option>>, - validator_store: Option>, + validator_store: Option>>, slot_clock: Option, beacon_nodes: Option>>, context: Option>, @@ -42,7 +42,7 @@ impl AttestationServiceBuilder { self } - pub fn validator_store(mut self, store: ValidatorStore) -> Self { + pub fn validator_store(mut self, store: Arc>) -> Self { self.validator_store = Some(store); self } @@ -88,7 +88,7 @@ impl AttestationServiceBuilder { /// Helper to minimise `Arc` usage. pub struct Inner { duties_service: Arc>, - validator_store: ValidatorStore, + validator_store: Arc>, slot_clock: T, beacon_nodes: Arc>, context: RuntimeContext, @@ -377,25 +377,22 @@ impl AttestationService { signature: AggregateSignature::infinity(), }; - if self - .validator_store - .sign_attestation( - &duty.pubkey, - duty.validator_committee_index as usize, - &mut attestation, - current_epoch, - ) - .is_some() - { - attestations.push(attestation); - } else { + if let Err(e) = self.validator_store.sign_attestation( + duty.pubkey, + duty.validator_committee_index as usize, + &mut attestation, + current_epoch, + ) { crit!( log, "Failed to sign attestation"; + "error" => ?e, "committee_index" => committee_index, "slot" => slot.as_u64(), ); continue; + } else { + attestations.push(attestation); } } @@ -497,17 +494,22 @@ impl AttestationService { continue; } - if let Some(aggregate) = self.validator_store.produce_signed_aggregate_and_proof( - &duty.pubkey, + match self.validator_store.produce_signed_aggregate_and_proof( + duty.pubkey, duty.validator_index, aggregated_attestation.clone(), selection_proof.clone(), ) { - signed_aggregate_and_proofs.push(aggregate); - } else { - crit!(log, "Failed to sign attestation"); - continue; - }; + Ok(aggregate) => signed_aggregate_and_proofs.push(aggregate), + Err(e) => { + crit!( + log, + "Failed to sign attestation"; + "error" => ?e + ); + continue; + } + } } if !signed_aggregate_and_proofs.is_empty() { diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 091a89c634..f102df18b1 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -5,7 +5,6 @@ use crate::{ use crate::{http_metrics::metrics, validator_store::ValidatorStore}; use environment::RuntimeContext; use eth2::types::Graffiti; -use futures::TryFutureExt; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use std::ops::Deref; @@ -15,7 +14,7 @@ use types::{EthSpec, PublicKeyBytes, Slot}; /// Builds a `BlockService`. pub struct BlockServiceBuilder { - validator_store: Option>, + validator_store: Option>>, slot_clock: Option>, beacon_nodes: Option>>, context: Option>, @@ -35,7 +34,7 @@ impl BlockServiceBuilder { } } - pub fn validator_store(mut self, store: ValidatorStore) -> Self { + pub fn validator_store(mut self, store: Arc>) -> Self { self.validator_store = Some(store); self } @@ -89,7 +88,7 @@ impl BlockServiceBuilder { /// Helper to minimise `Arc` usage. pub struct Inner { - validator_store: ValidatorStore, + validator_store: Arc>, slot_clock: Arc, beacon_nodes: Arc>, context: RuntimeContext, @@ -207,15 +206,15 @@ impl BlockService { let service = self.clone(); let log = log.clone(); self.inner.context.executor.spawn( - service - .publish_block(slot, validator_pubkey) - .unwrap_or_else(move |e| { + async move { + if let Err(e) = service.publish_block(slot, validator_pubkey).await { crit!( log, "Error whilst producing block"; "message" => e ); - }), + } + }, "block service", ); } @@ -240,8 +239,8 @@ impl BlockService { let randao_reveal = self .validator_store - .randao_reveal(&validator_pubkey, slot.epoch(E::slots_per_epoch())) - .ok_or("Unable to produce randao reveal")? + .randao_reveal(validator_pubkey, slot.epoch(E::slots_per_epoch())) + .map_err(|e| format!("Unable to produce randao reveal signature: {:?}", e))? .into(); let graffiti = self @@ -276,8 +275,8 @@ impl BlockService { let signed_block = self_ref .validator_store - .sign_block(validator_pubkey_ref, block, current_slot) - .ok_or("Unable to sign block")?; + .sign_block(*validator_pubkey_ref, block, current_slot) + .map_err(|e| format!("Unable to sign block: {:?}", e))?; let _post_timer = metrics::start_timer_vec( &metrics::BLOCK_SERVICE_TIMES, diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index df346cfeb9..386a20bf06 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -216,4 +216,19 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { and never provide an untrusted URL.") .takes_value(true), ) + .arg( + Arg::with_name("enable-doppelganger-protection") + .long("enable-doppelganger-protection") + .value_name("ENABLE_DOPPELGANGER_PROTECTION") + .help("If this flag is set, Lighthouse will delay startup for three epochs and \ + monitor for messages on the network by any of the validators managed by this \ + client. This will result in three (possibly four) epochs worth of missed \ + attestations. If an attestation is detected during this period, it means it is \ + very likely that you are running a second validator client with the same keys. \ + This validator client will immediately shutdown if this is detected in order \ + to avoid potentially committing a slashable offense. Use this flag in order to \ + ENABLE this functionality, without this flag Lighthouse will begin attesting \ + immediately.") + .takes_value(false), + ) } diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index c68dc6a428..06f91e2bb9 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -47,6 +47,9 @@ pub struct Config { pub http_metrics: http_metrics::Config, /// Configuration for sending metrics to a remote explorer endpoint. pub monitoring_api: Option, + /// If true, enable functionality that monitors the network for attestations or proposals from + /// any of the validators managed by this client before starting up. + pub enable_doppelganger_protection: bool, } impl Default for Config { @@ -76,6 +79,7 @@ impl Default for Config { http_api: <_>::default(), http_metrics: <_>::default(), monitoring_api: None, + enable_doppelganger_protection: false, } } } @@ -264,6 +268,10 @@ impl Config { }); } + if cli_args.is_present("enable-doppelganger-protection") { + config.enable_doppelganger_protection = true; + } + Ok(config) } } diff --git a/validator_client/src/doppelganger_service.rs b/validator_client/src/doppelganger_service.rs new file mode 100644 index 0000000000..1281be00b7 --- /dev/null +++ b/validator_client/src/doppelganger_service.rs @@ -0,0 +1,1325 @@ +//! The "Doppelganger" service is an **imperfect** mechanism to try and prevent the validator client +//! from starting whilst any of its validators are actively producing messages on the network. +//! +//! The mechanism works roughly like so: when the validator client starts or a new validator is +//! added, that validator is assigned a number of "remaining epochs". The doppelganger service +//! periodically polls the beacon node to see if that validator has been observed to produce +//! blocks/attestations in each epoch. After the doppelganger service is confident that an epoch has +//! passed without observing that validator, it will decrease the remaining epochs by one. Once the +//! remaining epochs is zero, the doppelganger will consider that validator to be safe-enough to +//! start. +//! +//! If a doppelganger is detected, the entire validator client will exit. +//! +//! For validators started during the genesis epoch, there is **no doppelganger protection!**. This +//! prevents a stale-mate where all validators will cease to function for a few epochs and then all +//! start at the same time. +//! +//! ## Caveat +//! +//! Presently doppelganger protection will never advance if the call at the last slot of each epoch +//! fails. This call is critical to ensuring that validators are able to start performing. +//! +//! ## Disclaimer +//! +//! The Doppelganger service is not perfect. It makes assumptions that any existing validator is +//! performing their duties as required and that the network is able to relay those messages to the +//! beacon node. Among other loop-holes, two validator clients started at the same time will not +//! detect each other. +//! +//! Doppelganger protection is a best-effort, last-line-of-defence mitigation. Do not rely upon it. + +use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::validator_store::ValidatorStore; +use environment::RuntimeContext; +use eth2::types::LivenessResponseData; +use parking_lot::RwLock; +use slog::{crit, error, info, Logger}; +use slot_clock::SlotClock; +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; +use task_executor::ShutdownReason; +use tokio::time::sleep; +use types::{Epoch, EthSpec, PublicKeyBytes, Slot}; + +/// A wrapper around `PublicKeyBytes` which encodes information about the status of a validator +/// pubkey with regards to doppelganger protection. +#[derive(Debug, PartialEq)] +pub enum DoppelgangerStatus { + /// Doppelganger protection has approved this for signing. + /// + /// This is because the service has waited some period of time to + /// detect other instances of this key on the network. + SigningEnabled(PublicKeyBytes), + /// Doppelganger protection is still waiting to detect other instances. + /// + /// Do not use this pubkey for signing slashable messages!! + /// + /// However, it can safely be used for other non-slashable operations (e.g., collecting duties + /// or subscribing to subnets). + SigningDisabled(PublicKeyBytes), + /// This pubkey is unknown to the doppelganger service. + /// + /// This represents a serious internal error in the program. This validator will be permanently + /// disabled! + UnknownToDoppelganger(PublicKeyBytes), +} + +impl DoppelgangerStatus { + /// Only return a pubkey if it is explicitly safe for doppelganger protection. + /// + /// If `Some(pubkey)` is returned, doppelganger has declared it safe for signing. + /// + /// ## Note + /// + /// "Safe" is only best-effort by doppelganger. There is no guarantee that a doppelganger + /// doesn't exist. + pub fn only_safe(self) -> Option { + match self { + DoppelgangerStatus::SigningEnabled(pubkey) => Some(pubkey), + DoppelgangerStatus::SigningDisabled(_) => None, + DoppelgangerStatus::UnknownToDoppelganger(_) => None, + } + } + + /// Returns a key regardless of whether or not doppelganger has approved it. Such a key might be + /// used for signing non-slashable messages, duties collection or other activities. + /// + /// If the validator is unknown to doppelganger then `None` will be returned. + pub fn ignored(self) -> Option { + match self { + DoppelgangerStatus::SigningEnabled(pubkey) => Some(pubkey), + DoppelgangerStatus::SigningDisabled(pubkey) => Some(pubkey), + DoppelgangerStatus::UnknownToDoppelganger(_) => None, + } + } + + /// Only return a pubkey if it will not be used for signing due to doppelganger detection. + pub fn only_unsafe(self) -> Option { + match self { + DoppelgangerStatus::SigningEnabled(_) => None, + DoppelgangerStatus::SigningDisabled(pubkey) => Some(pubkey), + DoppelgangerStatus::UnknownToDoppelganger(pubkey) => Some(pubkey), + } + } +} + +struct LivenessResponses { + current_epoch_responses: Vec, + previous_epoch_responses: Vec, +} + +/// The number of epochs that must be checked before we assume that there are no other duplicate +/// validators on the network. +pub const DEFAULT_REMAINING_DETECTION_EPOCHS: u64 = 1; + +/// Store the per-validator status of doppelganger checking. +#[derive(Debug, PartialEq)] +pub struct DoppelgangerState { + /// The next epoch for which the validator should be checked for liveness. + /// + /// Whilst `self.remaining_epochs > 0`, if a validator is found to be live in this epoch or any + /// following then we consider them to have an active doppelganger. + /// + /// Regardless of `self.remaining_epochs`, never indicate for a doppelganger for epochs that are + /// below `next_check_epoch`. This is to avoid the scenario where a user reboots their VC inside + /// a single epoch and we detect the activity of that previous process as doppelganger activity, + /// even when it's not running anymore. + next_check_epoch: Epoch, + /// The number of epochs that must be checked before this validator is considered + /// doppelganger-free. + remaining_epochs: u64, +} + +impl DoppelgangerState { + /// Returns `true` if the validator is *not* safe to sign. + fn requires_further_checks(&self) -> bool { + self.remaining_epochs > 0 + } + + /// Updates the `DoppelgangerState` to consider the given `Epoch`'s doppelganger checks + /// completed. + fn complete_detection_in_epoch(&mut self, epoch: Epoch) { + // The validator has successfully completed doppelganger checks for a new epoch. + self.remaining_epochs = self.remaining_epochs.saturating_sub(1); + + // Since we just satisfied the `previous_epoch`, the next epoch to satisfy should be + // the one following that. + self.next_check_epoch = epoch.saturating_add(1_u64); + } +} + +/// Perform two requests to the BN to obtain the liveness data for `validator_indices`. One +/// request will pertain to the `current_epoch`, the other to the `previous_epoch`. +/// +/// If the BN fails to respond to either of these requests, simply return an empty response. +/// This behaviour is to help prevent spurious failures on the BN from needlessly preventing +/// doppelganger progression. +async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( + beacon_nodes: Arc>, + log: Logger, + current_epoch: Epoch, + validator_indices: Vec, +) -> LivenessResponses { + let validator_indices = validator_indices.as_slice(); + + let previous_epoch = current_epoch.saturating_sub(1_u64); + + let previous_epoch_responses = if previous_epoch == current_epoch { + // If the previous epoch and the current epoch are the same, don't bother requesting the + // previous epoch indices. + // + // In such a scenario it will be possible to detect validators but we will never update + // any of the doppelganger states. + vec![] + } else { + // Request the previous epoch liveness state from the beacon node. + beacon_nodes + .first_success(RequireSynced::Yes, |beacon_node| async move { + beacon_node + .post_lighthouse_liveness(validator_indices, previous_epoch) + .await + .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) + .map(|result| result.data) + }) + .await + .unwrap_or_else(|e| { + crit!( + log, + "Failed previous epoch liveness query"; + "error" => %e, + "previous_epoch" => %previous_epoch, + ); + // Return an empty vec. In effect, this means to keep trying to make doppelganger + // progress even if some of the calls are failing. + vec![] + }) + }; + + // Request the current epoch liveness state from the beacon node. + let current_epoch_responses = beacon_nodes + .first_success(RequireSynced::Yes, |beacon_node| async move { + beacon_node + .post_lighthouse_liveness(validator_indices, current_epoch) + .await + .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) + .map(|result| result.data) + }) + .await + .unwrap_or_else(|e| { + crit!( + log, + "Failed current epoch liveness query"; + "error" => %e, + "current_epoch" => %current_epoch, + ); + // Return an empty vec. In effect, this means to keep trying to make doppelganger + // progress even if some of the calls are failing. + vec![] + }); + + // Alert the user if the beacon node is omitting validators from the response. + // + // This is not perfect since the validator might return duplicate entries, but it's a quick + // and easy way to detect issues. + if validator_indices.len() != current_epoch_responses.len() + || current_epoch_responses.len() != previous_epoch_responses.len() + { + error!( + log, + "Liveness query omitted validators"; + "previous_epoch_response" => previous_epoch_responses.len(), + "current_epoch_response" => current_epoch_responses.len(), + "requested" => validator_indices.len(), + ) + } + + LivenessResponses { + current_epoch_responses, + previous_epoch_responses, + } +} + +pub struct DoppelgangerService { + doppelganger_states: RwLock>, + log: Logger, +} + +impl DoppelgangerService { + pub fn new(log: Logger) -> Self { + Self { + doppelganger_states: <_>::default(), + log, + } + } + + /// Starts a reoccurring future which will try to keep the doppelganger service updated each + /// slot. + pub fn start_update_service( + service: Arc, + context: RuntimeContext, + validator_store: Arc>, + beacon_nodes: Arc>, + slot_clock: T, + ) -> Result<(), String> { + // Define the `get_index` function as one that uses the validator store. + let get_index = move |pubkey| validator_store.validator_index(&pubkey); + + // Define the `get_liveness` function as one that queries the beacon node API. + let log = service.log.clone(); + let get_liveness = move |current_epoch, validator_indices| { + beacon_node_liveness( + beacon_nodes.clone(), + log.clone(), + current_epoch, + validator_indices, + ) + }; + + let mut shutdown_sender = context.executor.shutdown_sender(); + let log = service.log.clone(); + let mut shutdown_func = move || { + if let Err(e) = + shutdown_sender.try_send(ShutdownReason::Failure("Doppelganger detected.")) + { + crit!( + log, + "Failed to send shutdown signal"; + "msg" => "terminate this process immediately", + "error" => ?e + ); + } + }; + + info!( + service.log, + "Doppelganger detection service started"; + ); + + context.executor.spawn( + async move { + loop { + let slot_duration = slot_clock.slot_duration(); + + if let Some(duration_to_next_slot) = slot_clock.duration_to_next_slot() { + // Run the doppelganger protection check 75% through each epoch. This + // *should* mean that the BN has seen the blocks and attestations for this + // slot. + sleep(duration_to_next_slot + (slot_duration / 4) * 3).await; + } else { + // Just sleep for one slot if we are unable to read the system clock, this gives + // us an opportunity for the clock to eventually come good. + sleep(slot_duration).await; + continue; + } + + if let Some(slot) = slot_clock.now() { + if let Err(e) = service + .detect_doppelgangers::( + slot, + &get_index, + &get_liveness, + &mut shutdown_func, + ) + .await + { + error!( + service.log, + "Error during doppelganger detection"; + "error" => ?e + ); + } + } + } + }, + "doppelganger_service", + ); + Ok(()) + } + + /// Returns the current status of the `validator` in the doppelganger protection process. + pub fn validator_status(&self, validator: PublicKeyBytes) -> DoppelgangerStatus { + self.doppelganger_states + .read() + .get(&validator) + .map(|v| { + if v.requires_further_checks() { + DoppelgangerStatus::SigningDisabled(validator) + } else { + DoppelgangerStatus::SigningEnabled(validator) + } + }) + .unwrap_or_else(|| { + crit!( + self.log, + "Validator unknown to doppelganger service"; + "msg" => "preventing validator from performing duties", + "pubkey" => ?validator + ); + DoppelgangerStatus::UnknownToDoppelganger(validator) + }) + } + + /// Register a new validator with the doppelganger service. + /// + /// Validators added during the genesis epoch will not have doppelganger protection applied to + /// them. + pub fn register_new_validator( + &self, + validator: PublicKeyBytes, + slot_clock: &T, + ) -> Result<(), String> { + let current_epoch = slot_clock + .now() + .ok_or_else(|| "Unable to read slot clock when registering validator".to_string())? + .epoch(E::slots_per_epoch()); + let genesis_epoch = slot_clock.genesis_slot().epoch(E::slots_per_epoch()); + + let remaining_epochs = if current_epoch <= genesis_epoch { + // Disable doppelganger protection when the validator was initialized before genesis. + // + // Without this, all validators would simply miss the first + // `DEFAULT_REMAINING_DETECTION_EPOCHS` epochs and then all start at the same time. This + // would be pointless. + // + // The downside of this is that no validators have doppelganger protection at genesis. + // It's an unfortunate trade-off. + 0 + } else { + DEFAULT_REMAINING_DETECTION_EPOCHS + }; + + let state = DoppelgangerState { + next_check_epoch: current_epoch.saturating_add(1_u64), + remaining_epochs, + }; + + self.doppelganger_states.write().insert(validator, state); + + Ok(()) + } + + /// Contact the beacon node and try to detect if there are any doppelgangers, updating the state + /// of `self`. + /// + /// ## Notes + /// + /// This function is relatively complex when it comes to generic parameters. This is to allow + /// for simple unit testing. Using these generics, we can test the `DoppelgangerService` without + /// needing a BN API or a `ValidatorStore`. + async fn detect_doppelgangers( + &self, + request_slot: Slot, + get_index: &I, + get_liveness: &L, + shutdown_func: &mut S, + ) -> Result<(), String> + where + E: EthSpec, + I: Fn(PublicKeyBytes) -> Option, + L: Fn(Epoch, Vec) -> F, + F: Future, + S: FnMut(), + { + // Get all validators with active doppelganger protection. + let indices_map = self.compute_detection_indices_map(get_index); + + if indices_map.is_empty() { + // Nothing to do. + return Ok(()); + } + + // Get a list of indices to provide to the BN API. + let indices_only = indices_map.iter().map(|(index, _)| *index).collect(); + + // Pull the liveness responses from the BN. + let request_epoch = request_slot.epoch(E::slots_per_epoch()); + let liveness_responses = get_liveness(request_epoch, indices_only).await; + + // Process the responses, attempting to detect doppelgangers. + self.process_liveness_responses::( + request_slot, + liveness_responses, + &indices_map, + shutdown_func, + ) + } + + /// Get a map of `validator_index` -> `validator_pubkey` for all validators still requiring + /// further doppelganger checks. + /// + /// Any validator with an unknown index will be omitted from these results. + fn compute_detection_indices_map(&self, get_index: &F) -> HashMap + where + F: Fn(PublicKeyBytes) -> Option, + { + let detection_pubkeys = self + .doppelganger_states + .read() + .iter() + .filter_map(|(pubkey, state)| { + if state.requires_further_checks() { + Some(*pubkey) + } else { + None + } + }) + .collect::>(); + + // Maps validator indices to pubkeys. + let mut indices_map = HashMap::with_capacity(detection_pubkeys.len()); + + // It is important to ensure that the `self.doppelganger_states` lock is not interleaved with + // any other locks. That is why this is a separate loop to the one that generates + // `detection_pubkeys`. + for pubkey in detection_pubkeys { + if let Some(index) = get_index(pubkey) { + indices_map.insert(index, pubkey); + } + } + + indices_map + } + + /// Process the liveness responses from the BN, potentially updating doppelganger states or + /// shutting down the VC. + fn process_liveness_responses( + &self, + request_slot: Slot, + liveness_responses: LivenessResponses, + indices_map: &HashMap, + shutdown_func: &mut S, + ) -> Result<(), String> + where + S: FnMut(), + { + let request_epoch = request_slot.epoch(E::slots_per_epoch()); + let previous_epoch = request_epoch.saturating_sub(1_u64); + let LivenessResponses { + previous_epoch_responses, + current_epoch_responses, + } = liveness_responses; + + // Perform a loop through the current and previous epoch responses and detect any violators. + // + // A following loop will update the states of each validator, depending on whether or not + // any violators were detected here. + let mut violators = vec![]; + for response in previous_epoch_responses + .iter() + .chain(current_epoch_responses.iter()) + { + if !response.is_live { + continue; + } + + // Resolve the index from the server response back to a public key. + let pubkey = if let Some(pubkey) = indices_map.get(&response.index) { + pubkey + } else { + crit!( + self.log, + "Inconsistent indices map"; + "validator_index" => response.index, + ); + // Skip this result if an inconsistency is detected. + continue; + }; + + let next_check_epoch = if let Some(state) = self.doppelganger_states.read().get(pubkey) + { + state.next_check_epoch + } else { + crit!( + self.log, + "Inconsistent doppelganger state"; + "validator_pubkey" => ?pubkey, + ); + // Skip this result if an inconsistency is detected. + continue; + }; + + if response.is_live && next_check_epoch >= response.epoch { + violators.push(response.index); + } + } + + let violators_exist = !violators.is_empty(); + if violators_exist { + crit!( + self.log, + "Doppelganger(s) detected"; + "msg" => "A doppelganger occurs when two different validator clients run the \ + same public key. This validator client detected another instance of a local \ + validator on the network and is shutting down to prevent potential slashable \ + offences. Ensure that you are not running a duplicate or overlapping \ + validator client", + "doppelganger_indices" => ?violators + ) + } + + // The concept of "epoch satisfaction" is that for some epoch `e` we are *satisfied* that + // we've waited long enough such that we don't expect to see any more consensus messages + // for that epoch. + // + // As it stands now, we consider epoch `e` to be satisfied once we're in the last slot of + // epoch `e + 1`. + // + // The reasoning for this choice of satisfaction slot is that by this point we've + // *probably* seen all the blocks that are permitted to contain attestations from epoch `e`. + let previous_epoch_satisfaction_slot = previous_epoch + .saturating_add(1_u64) + .end_slot(E::slots_per_epoch()); + let previous_epoch_is_satisfied = request_slot >= previous_epoch_satisfaction_slot; + + // Iterate through all the previous epoch responses, updating `self.doppelganger_states`. + // + // Do not bother iterating through the current epoch responses since they've already been + // checked for violators and they don't result in updating the state. + for response in &previous_epoch_responses { + // Sanity check response from the server. + // + // Abort the entire routine if the server starts returning junk. + if response.epoch != previous_epoch { + return Err(format!( + "beacon node returned epoch {}, expecting {}", + response.epoch, previous_epoch + )); + } + + // Resolve the index from the server response back to a public key. + let pubkey = indices_map + .get(&response.index) + // Abort the routine if inconsistency is detected. + .ok_or_else(|| { + format!( + "inconsistent indices map for validator index {}", + response.index + ) + })?; + + // Hold the lock on `self` for the rest of this function. + // + // !! IMPORTANT !! + // + // There is a write-lock being held, avoid interacting with locks until it is dropped. + let mut doppelganger_states = self.doppelganger_states.write(); + let doppelganger_state = doppelganger_states + .get_mut(pubkey) + // Abort the routine if inconsistency is detected. + .ok_or_else(|| format!("inconsistent states for validator pubkey {}", pubkey))?; + + // If a single doppelganger is detected, enable doppelganger checks on all + // validators forever (technically only 2**64 epochs). + // + // This has the effect of stopping validator activity even if the validator client + // fails to shut down. + // + // A weird side-effect is that the BN will keep getting liveness queries that will be + // ignored by the VC. Since the VC *should* shutdown anyway, this seems fine. + if violators_exist { + doppelganger_state.remaining_epochs = u64::MAX; + continue; + } + + let is_newly_satisfied_epoch = previous_epoch_is_satisfied + && previous_epoch >= doppelganger_state.next_check_epoch; + + if !response.is_live && is_newly_satisfied_epoch { + // Update the `doppelganger_state` to consider the previous epoch's checks complete. + doppelganger_state.complete_detection_in_epoch(previous_epoch); + + info!( + self.log, + "Found no doppelganger"; + "further_checks_remaining" => doppelganger_state.remaining_epochs, + "epoch" => response.index, + "validator_index" => response.index + ); + + if doppelganger_state.remaining_epochs == 0 { + info!( + self.log, + "Doppelganger detection complete"; + "msg" => "starting validator", + "validator_index" => response.index + ); + } + } + } + + // Attempt to shutdown the validator client if there are any detected duplicate validators. + if violators_exist { + shutdown_func(); + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use environment::null_logger; + use futures::executor::block_on; + use slot_clock::TestingSlotClock; + use std::collections::HashSet; + use std::future; + use std::time::Duration; + use types::{ + test_utils::{SeedableRng, TestRandom, XorShiftRng}, + MainnetEthSpec, + }; + + const DEFAULT_VALIDATORS: usize = 8; + + type E = MainnetEthSpec; + + fn genesis_epoch() -> Epoch { + E::default_spec().genesis_slot.epoch(E::slots_per_epoch()) + } + + fn check_detection_indices(detection_indices: &[u64]) { + assert_eq!( + detection_indices.iter().copied().collect::>(), + (0..DEFAULT_VALIDATORS as u64).collect::>(), + "all validators should be included in detection indices" + ); + } + + struct TestBuilder { + validator_count: usize, + } + + impl Default for TestBuilder { + fn default() -> Self { + Self { + validator_count: DEFAULT_VALIDATORS, + } + } + } + + impl TestBuilder { + fn build(self) -> TestScenario { + let mut rng = XorShiftRng::from_seed([42; 16]); + let slot_clock = + TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), Duration::from_secs(1)); + let log = null_logger().unwrap(); + + TestScenario { + validators: (0..self.validator_count) + .map(|_| PublicKeyBytes::random_for_test(&mut rng)) + .collect(), + doppelganger: DoppelgangerService::new(log), + slot_clock, + } + } + } + + struct TestScenario { + validators: Vec, + doppelganger: DoppelgangerService, + slot_clock: TestingSlotClock, + } + + impl TestScenario { + pub fn pubkey_to_index_map(&self) -> HashMap { + self.validators + .iter() + .enumerate() + .map(|(index, pubkey)| (*pubkey, index as u64)) + .collect() + } + + pub fn set_slot(self, slot: Slot) -> Self { + self.slot_clock.set_slot(slot.into()); + self + } + + pub fn register_all_in_doppelganger_protection_if_enabled(self) -> Self { + let mut this = self; + for i in 0..this.validators.len() { + this = this.register_validator(i as u64); + } + this + } + + pub fn register_validators(self, validators: &[u64]) -> Self { + let mut this = self; + for i in validators { + this = this.register_validator(*i); + } + this + } + + pub fn register_validator(self, index: u64) -> Self { + let pubkey = *self + .validators + .get(index as usize) + .expect("index should exist"); + + self.doppelganger + .register_new_validator::(pubkey, &self.slot_clock) + .unwrap(); + self.doppelganger + .doppelganger_states + .read() + .get(&pubkey) + .expect("validator should be registered"); + + self + } + + pub fn assert_all_enabled(self) -> Self { + /* + * 1. Ensure all validators have the correct status. + */ + for validator in &self.validators { + assert_eq!( + self.doppelganger.validator_status(*validator), + DoppelgangerStatus::SigningEnabled(*validator), + "all validators should be enabled" + ); + } + + /* + * 2. Ensure a correct detection indices map is generated. + */ + let pubkey_to_index = self.pubkey_to_index_map(); + let generated_map = self + .doppelganger + .compute_detection_indices_map(&|pubkey| pubkey_to_index.get(&pubkey).copied()); + assert!( + generated_map.is_empty(), + "there should be no indices for detection if all validators are enabled" + ); + + self + } + + pub fn assert_all_disabled(self) -> Self { + /* + * 1. Ensure all validators have the correct status. + */ + for validator in &self.validators { + assert_eq!( + self.doppelganger.validator_status(*validator), + DoppelgangerStatus::SigningDisabled(*validator), + "all validators should be disabled" + ); + } + + /* + * 2. Ensure a correct detection indices map is generated. + */ + let pubkey_to_index = self.pubkey_to_index_map(); + let generated_map = self + .doppelganger + .compute_detection_indices_map(&|pubkey| pubkey_to_index.get(&pubkey).copied()); + + assert_eq!( + pubkey_to_index.len(), + generated_map.len(), + "should declare all indices for detection" + ); + for (pubkey, index) in pubkey_to_index { + assert_eq!( + generated_map.get(&index), + Some(&pubkey), + "map should be consistent" + ); + } + + self + } + + pub fn assert_all_states(self, state: &DoppelgangerState) -> Self { + let mut this = self; + for i in 0..this.validators.len() { + this = this.assert_state(i as u64, state); + } + this + } + + pub fn assert_state(self, index: u64, state: &DoppelgangerState) -> Self { + let pubkey = *self + .validators + .get(index as usize) + .expect("index should exist"); + + assert_eq!( + self.doppelganger + .doppelganger_states + .read() + .get(&pubkey) + .expect("validator should be present"), + state, + "validator should match provided state" + ); + + self + } + + pub fn assert_unregistered(self, index: u64) -> Self { + let pubkey = *self + .validators + .get(index as usize) + .expect("index should exist in test scenario"); + + assert!( + self.doppelganger + .doppelganger_states + .read() + .get(&pubkey) + .is_none(), + "validator should not be present in states" + ); + + assert_eq!( + self.doppelganger.validator_status(pubkey), + DoppelgangerStatus::UnknownToDoppelganger(pubkey), + "validator status should be unknown" + ); + + self + } + } + + #[test] + fn enabled_in_genesis_epoch() { + for slot in genesis_epoch().slot_iter(E::slots_per_epoch()) { + TestBuilder::default() + .build() + .set_slot(slot) + .register_all_in_doppelganger_protection_if_enabled() + .assert_all_enabled() + .assert_all_states(&DoppelgangerState { + next_check_epoch: genesis_epoch() + 1, + remaining_epochs: 0, + }); + } + } + + #[test] + fn disabled_after_genesis_epoch() { + let epoch = genesis_epoch() + 1; + + for slot in epoch.slot_iter(E::slots_per_epoch()) { + TestBuilder::default() + .build() + .set_slot(slot) + .register_all_in_doppelganger_protection_if_enabled() + .assert_all_disabled() + .assert_all_states(&DoppelgangerState { + next_check_epoch: epoch + 1, + remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, + }); + } + } + + #[test] + fn unregistered_validator() { + // Non-genesis epoch + let epoch = genesis_epoch() + 2; + + TestBuilder::default() + .build() + .set_slot(epoch.start_slot(E::slots_per_epoch())) + // Register only validator 1. + .register_validator(1) + // Ensure validator 1 was registered. + .assert_state( + 1, + &DoppelgangerState { + next_check_epoch: epoch + 1, + remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, + }, + ) + // Ensure validator 2 was not registered. + .assert_unregistered(2); + } + + enum ShouldShutdown { + Yes, + No, + } + + fn get_false_responses(current_epoch: Epoch, detection_indices: &[u64]) -> LivenessResponses { + LivenessResponses { + current_epoch_responses: detection_indices + .iter() + .map(|i| LivenessResponseData { + index: *i as u64, + epoch: current_epoch, + is_live: false, + }) + .collect(), + previous_epoch_responses: detection_indices + .iter() + .map(|i| LivenessResponseData { + index: *i as u64, + epoch: current_epoch - 1, + is_live: false, + }) + .collect(), + } + } + + impl TestScenario { + pub fn simulate_detect_doppelgangers( + self, + slot: Slot, + should_shutdown: ShouldShutdown, + get_liveness: L, + ) -> Self + where + L: Fn(Epoch, Vec) -> F, + F: Future, + { + // Create a simulated shutdown sender. + let mut did_shutdown = false; + let mut shutdown_func = || did_shutdown = true; + + // Create a simulated validator store that can resolve pubkeys to indices. + let pubkey_to_index = self.pubkey_to_index_map(); + let get_index = |pubkey| pubkey_to_index.get(&pubkey).copied(); + + block_on(self.doppelganger.detect_doppelgangers::( + slot, + &get_index, + &get_liveness, + &mut shutdown_func, + )) + .expect("detection should not error"); + + match should_shutdown { + ShouldShutdown::Yes if !did_shutdown => panic!("vc failed to shutdown"), + ShouldShutdown::No if did_shutdown => panic!("vc shutdown when it shouldn't"), + _ => (), + } + + self + } + } + + #[test] + fn detect_at_genesis() { + let epoch = genesis_epoch(); + let slot = epoch.start_slot(E::slots_per_epoch()); + + TestBuilder::default() + .build() + .set_slot(slot) + .register_all_in_doppelganger_protection_if_enabled() + // All validators should have signing enabled since it's the genesis epoch. + .assert_all_enabled() + .simulate_detect_doppelgangers( + slot, + ShouldShutdown::No, + |_, _| { + panic!("the beacon node should not get a request if there are no doppelganger validators"); + + // The compiler needs this, otherwise it complains that this isn't a future. + #[allow(unreachable_code)] + future::ready(get_false_responses(Epoch::new(0), &[])) + }, + ) + // All validators should be enabled. + .assert_all_enabled(); + } + + fn detect_after_genesis_test(mutate_responses: F) + where + F: Fn(&mut LivenessResponses), + { + let epoch = genesis_epoch() + 1; + let slot = epoch.start_slot(E::slots_per_epoch()); + + TestBuilder::default() + .build() + .set_slot(slot) + .register_all_in_doppelganger_protection_if_enabled() + .assert_all_disabled() + // First, simulate a check where there are no doppelgangers. + .simulate_detect_doppelgangers( + slot, + ShouldShutdown::No, + |current_epoch, detection_indices: Vec<_>| { + assert_eq!(current_epoch, epoch); + check_detection_indices(&detection_indices); + + let liveness_responses = get_false_responses(current_epoch, &detection_indices); + + future::ready(liveness_responses) + }, + ) + // All validators should be disabled since they started after genesis. + .assert_all_disabled() + // Now, simulate a check where we apply `mutate_responses` which *must* create some + // doppelgangers. + .simulate_detect_doppelgangers( + // Perform this check in the next slot. + slot + 1, + ShouldShutdown::Yes, + |current_epoch, detection_indices: Vec<_>| { + assert_eq!(current_epoch, epoch); + check_detection_indices(&detection_indices); + + let mut liveness_responses = + get_false_responses(current_epoch, &detection_indices); + + mutate_responses(&mut liveness_responses); + + future::ready(liveness_responses) + }, + ) + // All validators should still be disabled. + .assert_all_disabled() + // The states of all validators should be jammed with `u64::max_value()`. + .assert_all_states(&DoppelgangerState { + next_check_epoch: epoch + 1, + remaining_epochs: u64::MAX, + }); + } + + #[test] + fn detect_after_genesis_with_current_epoch_doppelganger() { + detect_after_genesis_test(|liveness_responses| { + liveness_responses.current_epoch_responses[0].is_live = true + }) + } + + #[test] + fn detect_after_genesis_with_previous_epoch_doppelganger() { + detect_after_genesis_test(|liveness_responses| { + liveness_responses.previous_epoch_responses[0].is_live = true + }) + } + + #[test] + fn no_doppelgangers_for_adequate_time() { + let initial_epoch = genesis_epoch() + 42; + let initial_slot = initial_epoch.start_slot(E::slots_per_epoch()); + let activation_slot = + (initial_epoch + DEFAULT_REMAINING_DETECTION_EPOCHS + 1).end_slot(E::slots_per_epoch()); + + let mut scenario = TestBuilder::default() + .build() + .set_slot(initial_slot) + .register_all_in_doppelganger_protection_if_enabled() + .assert_all_disabled(); + + for slot in initial_slot.as_u64()..=activation_slot.as_u64() { + let slot = Slot::new(slot); + let epoch = slot.epoch(E::slots_per_epoch()); + + scenario = scenario.simulate_detect_doppelgangers( + slot, + ShouldShutdown::No, + |current_epoch, detection_indices: Vec<_>| { + assert_eq!(current_epoch, epoch); + check_detection_indices(&detection_indices); + + let liveness_responses = get_false_responses(current_epoch, &detection_indices); + + future::ready(liveness_responses) + }, + ); + + let is_first_epoch = epoch == initial_epoch; + let is_second_epoch = epoch == initial_epoch + 1; + let is_satisfaction_slot = slot == epoch.end_slot(E::slots_per_epoch()); + let epochs_since_start = epoch.as_u64().checked_sub(initial_epoch.as_u64()).unwrap(); + + let expected_state = if is_first_epoch || is_second_epoch { + DoppelgangerState { + next_check_epoch: initial_epoch + 1, + remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, + } + } else if !is_satisfaction_slot { + DoppelgangerState { + next_check_epoch: epoch - 1, + remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS + .saturating_sub(epochs_since_start.saturating_sub(2)), + } + } else { + DoppelgangerState { + next_check_epoch: epoch, + remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS + .saturating_sub(epochs_since_start.saturating_sub(1)), + } + }; + + scenario = scenario.assert_all_states(&expected_state); + + scenario = if slot < activation_slot { + scenario.assert_all_disabled() + } else { + scenario.assert_all_enabled() + }; + } + + scenario + .assert_all_enabled() + .assert_all_states(&DoppelgangerState { + next_check_epoch: activation_slot.epoch(E::slots_per_epoch()), + remaining_epochs: 0, + }); + } + + #[test] + fn time_skips_forward() { + let initial_epoch = genesis_epoch() + 1; + let initial_slot = initial_epoch.start_slot(E::slots_per_epoch()); + let skipped_forward_epoch = initial_epoch + 42; + let skipped_forward_slot = skipped_forward_epoch.end_slot(E::slots_per_epoch()); + + TestBuilder::default() + .build() + .set_slot(initial_slot) + .register_all_in_doppelganger_protection_if_enabled() + .assert_all_disabled() + // First, simulate a check in the initialization epoch. + .simulate_detect_doppelgangers( + initial_slot, + ShouldShutdown::No, + |current_epoch, detection_indices: Vec<_>| { + assert_eq!(current_epoch, initial_epoch); + check_detection_indices(&detection_indices); + + future::ready(get_false_responses(current_epoch, &detection_indices)) + }, + ) + .assert_all_disabled() + .assert_all_states(&DoppelgangerState { + next_check_epoch: initial_epoch + 1, + remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, + }) + // Simulate a check in the skipped forward slot + .simulate_detect_doppelgangers( + skipped_forward_slot, + ShouldShutdown::No, + |current_epoch, detection_indices: Vec<_>| { + assert_eq!(current_epoch, skipped_forward_epoch); + check_detection_indices(&detection_indices); + + future::ready(get_false_responses(current_epoch, &detection_indices)) + }, + ) + .assert_all_states(&DoppelgangerState { + next_check_epoch: skipped_forward_epoch, + remaining_epochs: 0, + }); + } + + #[test] + fn time_skips_backward() { + let initial_epoch = genesis_epoch() + 42; + let initial_slot = initial_epoch.start_slot(E::slots_per_epoch()); + let skipped_backward_epoch = initial_epoch - 12; + let skipped_backward_slot = skipped_backward_epoch.end_slot(E::slots_per_epoch()); + + TestBuilder::default() + .build() + .set_slot(initial_slot) + .register_all_in_doppelganger_protection_if_enabled() + .assert_all_disabled() + // First, simulate a check in the initialization epoch. + .simulate_detect_doppelgangers( + initial_slot, + ShouldShutdown::No, + |current_epoch, detection_indices: Vec<_>| { + assert_eq!(current_epoch, initial_epoch); + check_detection_indices(&detection_indices); + + future::ready(get_false_responses(current_epoch, &detection_indices)) + }, + ) + .assert_all_disabled() + .assert_all_states(&DoppelgangerState { + next_check_epoch: initial_epoch + 1, + remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, + }) + // Simulate a check in the skipped backward slot + .simulate_detect_doppelgangers( + skipped_backward_slot, + ShouldShutdown::No, + |current_epoch, detection_indices: Vec<_>| { + assert_eq!(current_epoch, skipped_backward_epoch); + check_detection_indices(&detection_indices); + + future::ready(get_false_responses(current_epoch, &detection_indices)) + }, + ) + .assert_all_disabled() + // When time skips backward we should *not* allow doppelganger advancement. + .assert_all_states(&DoppelgangerState { + next_check_epoch: initial_epoch + 1, + remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, + }); + } + + #[test] + fn staggered_entry() { + let early_epoch = genesis_epoch() + 42; + let early_slot = early_epoch.start_slot(E::slots_per_epoch()); + let early_activation_slot = + (early_epoch + DEFAULT_REMAINING_DETECTION_EPOCHS + 1).end_slot(E::slots_per_epoch()); + + let late_epoch = early_epoch + 1; + let late_slot = late_epoch.start_slot(E::slots_per_epoch()); + let late_activation_slot = + (late_epoch + DEFAULT_REMAINING_DETECTION_EPOCHS + 1).end_slot(E::slots_per_epoch()); + + let early_validators: Vec = (0..DEFAULT_VALIDATORS as u64 / 2).collect(); + let late_validators: Vec = + (DEFAULT_VALIDATORS as u64 / 2..DEFAULT_VALIDATORS as u64).collect(); + + let mut scenario = TestBuilder::default() + .build() + .set_slot(early_slot) + .register_validators(&early_validators) + .set_slot(late_slot) + .register_validators(&late_validators) + .assert_all_disabled(); + + for slot in early_slot.as_u64()..=late_activation_slot.as_u64() { + let slot = Slot::new(slot); + + scenario = scenario.simulate_detect_doppelgangers( + slot, + ShouldShutdown::No, + |current_epoch, detection_indices: Vec<_>| { + future::ready(get_false_responses(current_epoch, &detection_indices)) + }, + ); + + for index in 0..DEFAULT_VALIDATORS as u64 { + let pubkey = *scenario.validators.get(index as usize).unwrap(); + + let should_be_disabled = if early_validators.contains(&index) { + slot < early_activation_slot + } else if late_validators.contains(&index) { + slot < late_activation_slot + } else { + unreachable!("inconsistent test"); + }; + + if should_be_disabled { + assert_eq!( + scenario.doppelganger.validator_status(pubkey), + DoppelgangerStatus::SigningDisabled(pubkey) + ) + } else { + assert_eq!( + scenario.doppelganger.validator_status(pubkey), + DoppelgangerStatus::SigningEnabled(pubkey) + ) + } + } + } + + scenario.assert_all_enabled(); + } +} diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 584a88194d..1a667d1cb2 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -8,7 +8,9 @@ use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::{ - block_service::BlockServiceNotification, http_metrics::metrics, validator_store::ValidatorStore, + block_service::BlockServiceNotification, + http_metrics::metrics, + validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}, }; use environment::RuntimeContext; use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; @@ -36,7 +38,7 @@ const HISTORICAL_DUTIES_EPOCHS: u64 = 2; pub enum Error { UnableToReadSlotClock, FailedToDownloadAttesters(String), - FailedToProduceSelectionProof, + FailedToProduceSelectionProof(ValidatorStoreError), InvalidModulo(ArithError), } @@ -56,8 +58,8 @@ impl DutyAndProof { spec: &ChainSpec, ) -> Result { let selection_proof = validator_store - .produce_selection_proof(&duty.pubkey, duty.slot) - .ok_or(Error::FailedToProduceSelectionProof)?; + .produce_selection_proof(duty.pubkey, duty.slot) + .map_err(Error::FailedToProduceSelectionProof)?; let selection_proof = selection_proof .is_aggregator(duty.committee_length as usize, spec) @@ -84,7 +86,6 @@ type DependentRoot = Hash256; type AttesterMap = HashMap>; type ProposerMap = HashMap)>; -type IndicesMap = HashMap; /// See the module-level documentation. pub struct DutiesService { @@ -93,11 +94,8 @@ pub struct DutiesService { /// Maps an epoch to all *local* proposers in this epoch. Notably, this does not contain /// proposals for any validators which are not registered locally. pub proposers: RwLock, - /// Maps a public key to a validator index. There is a task which ensures this map is kept - /// up-to-date. - pub indices: RwLock, /// Provides the canonical list of locally-managed validators. - pub validator_store: ValidatorStore, + pub validator_store: Arc>, /// Tracks the current slot. pub slot_clock: T, /// Provides HTTP access to remote beacon nodes. @@ -119,21 +117,44 @@ impl DutiesService { /// Returns the total number of validators that should propose in the given epoch. pub fn proposer_count(&self, epoch: Epoch) -> usize { + // Only collect validators that are considered safe in terms of doppelganger protection. + let signing_pubkeys: HashSet<_> = self + .validator_store + .voting_pubkeys(DoppelgangerStatus::only_safe); + self.proposers .read() .get(&epoch) - .map_or(0, |(_, proposers)| proposers.len()) + .map_or(0, |(_, proposers)| { + proposers + .iter() + .filter(|proposer_data| signing_pubkeys.contains(&proposer_data.pubkey)) + .count() + }) } /// Returns the total number of validators that should attest in the given epoch. pub fn attester_count(&self, epoch: Epoch) -> usize { + // Only collect validators that are considered safe in terms of doppelganger protection. + let signing_pubkeys: HashSet<_> = self + .validator_store + .voting_pubkeys(DoppelgangerStatus::only_safe); self.attesters .read() .iter() - .filter(|(_, map)| map.contains_key(&epoch)) + .filter_map(|(_, map)| map.get(&epoch)) + .map(|(_, duty_and_proof)| duty_and_proof) + .filter(|duty_and_proof| signing_pubkeys.contains(&duty_and_proof.duty.pubkey)) .count() } + /// Returns the total number of validators that are in a doppelganger detection period. + pub fn doppelganger_detecting_count(&self) -> usize { + self.validator_store + .voting_pubkeys::, _>(DoppelgangerStatus::only_unsafe) + .len() + } + /// Returns the pubkeys of the validators which are assigned to propose in the given slot. /// /// It is possible that multiple validators have an identical proposal slot, however that is @@ -141,13 +162,21 @@ impl DutiesService { pub fn block_proposers(&self, slot: Slot) -> HashSet { let epoch = slot.epoch(E::slots_per_epoch()); + // Only collect validators that are considered safe in terms of doppelganger protection. + let signing_pubkeys: HashSet<_> = self + .validator_store + .voting_pubkeys(DoppelgangerStatus::only_safe); + self.proposers .read() .get(&epoch) .map(|(_, proposers)| { proposers .iter() - .filter(|proposer_data| proposer_data.slot == slot) + .filter(|proposer_data| { + proposer_data.slot == slot + && signing_pubkeys.contains(&proposer_data.pubkey) + }) .map(|proposer_data| proposer_data.pubkey) .collect() }) @@ -158,12 +187,20 @@ impl DutiesService { pub fn attesters(&self, slot: Slot) -> Vec { let epoch = slot.epoch(E::slots_per_epoch()); + // Only collect validators that are considered safe in terms of doppelganger protection. + let signing_pubkeys: HashSet<_> = self + .validator_store + .voting_pubkeys(DoppelgangerStatus::only_safe); + self.attesters .read() .iter() .filter_map(|(_, map)| map.get(&epoch)) .map(|(_, duty_and_proof)| duty_and_proof) - .filter(|duty_and_proof| duty_and_proof.duty.slot == slot) + .filter(|duty_and_proof| { + duty_and_proof.duty.slot == slot + && signing_pubkeys.contains(&duty_and_proof.duty.pubkey) + }) .cloned() .collect() } @@ -276,9 +313,23 @@ async fn poll_validator_indices( metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::UPDATE_INDICES]); let log = duties_service.context.log(); - for pubkey in duties_service.validator_store.voting_pubkeys() { + + // Collect *all* pubkeys for resolving indices, even those undergoing doppelganger protection. + // + // Since doppelganger protection queries rely on validator indices it is important to ensure we + // collect those indices. + let all_pubkeys: Vec<_> = duties_service + .validator_store + .voting_pubkeys(DoppelgangerStatus::ignored); + + for pubkey in all_pubkeys { // This is on its own line to avoid some weirdness with locks and if statements. - let is_known = duties_service.indices.read().contains_key(&pubkey); + let is_known = duties_service + .validator_store + .initialized_validators() + .read() + .get_index(&pubkey) + .is_some(); if !is_known { // Query the remote BN to resolve a pubkey to a validator index. @@ -307,9 +358,10 @@ async fn poll_validator_indices( "validator_index" => response.data.index ); duties_service - .indices + .validator_store + .initialized_validators() .write() - .insert(pubkey, response.data.index); + .set_index(&pubkey, response.data.index); } // This is not necessarily an error, it just means the validator is not yet known to // the beacon chain. @@ -359,18 +411,22 @@ async fn poll_beacon_attesters( let current_epoch = current_slot.epoch(E::slots_per_epoch()); let next_epoch = current_epoch + 1; - let local_pubkeys: HashSet = duties_service + // Collect *all* pubkeys, even those undergoing doppelganger protection. + // + // We must know the duties for doppelganger validators so that we can subscribe to their subnets + // and get more information about other running instances. + let local_pubkeys: HashSet<_> = duties_service .validator_store - .voting_pubkeys() - .into_iter() - .collect(); + .voting_pubkeys(DoppelgangerStatus::ignored); let local_indices = { let mut local_indices = Vec::with_capacity(local_pubkeys.len()); - let indices_map = duties_service.indices.read(); + + let vals_ref = duties_service.validator_store.initialized_validators(); + let vals = vals_ref.read(); for &pubkey in &local_pubkeys { - if let Some(validator_index) = indices_map.get(&pubkey) { - local_indices.push(*validator_index) + if let Some(validator_index) = vals.get_index(&pubkey) { + local_indices.push(validator_index) } } local_indices @@ -636,15 +692,18 @@ async fn poll_beacon_proposers( current_slot, &initial_block_proposers, block_service_tx, + &duties_service.validator_store, log, ) .await; - let local_pubkeys: HashSet = duties_service + // Collect *all* pubkeys, even those undergoing doppelganger protection. + // + // It is useful to keep the duties for all validators around, so they're on hand when + // doppelganger finishes. + let local_pubkeys: HashSet<_> = duties_service .validator_store - .voting_pubkeys() - .into_iter() - .collect(); + .voting_pubkeys(DoppelgangerStatus::ignored); // Only download duties and push out additional block production events if we have some // validators. @@ -723,6 +782,7 @@ async fn poll_beacon_proposers( current_slot, &additional_block_producers, block_service_tx, + &duties_service.validator_store, log, ) .await; @@ -745,24 +805,33 @@ async fn poll_beacon_proposers( } /// Notify the block service if it should produce a block. -async fn notify_block_production_service( +async fn notify_block_production_service( current_slot: Slot, block_proposers: &HashSet, block_service_tx: &mut Sender, + validator_store: &ValidatorStore, log: &Logger, ) { - if let Err(e) = block_service_tx - .send(BlockServiceNotification { - slot: current_slot, - block_proposers: block_proposers.iter().copied().collect(), - }) - .await - { - error!( - log, - "Failed to notify block service"; - "current_slot" => current_slot, - "error" => %e - ); - }; + let non_doppelganger_proposers = block_proposers + .iter() + .filter(|pubkey| validator_store.doppelganger_protection_allows_signing(**pubkey)) + .copied() + .collect::>(); + + if !non_doppelganger_proposers.is_empty() { + if let Err(e) = block_service_tx + .send(BlockServiceNotification { + slot: current_slot, + block_proposers: non_doppelganger_proposers, + }) + .await + { + error!( + log, + "Failed to notify block service"; + "current_slot" => current_slot, + "error" => %e + ); + }; + } } diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 2c2df187b2..f5d39e397c 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -137,6 +137,11 @@ impl ForkService { *self.fork.read() } + /// Returns the slot clock. + pub fn slot_clock(&self) -> T { + self.slot_clock.clone() + } + /// Starts the service that periodically polls for the `Fork`. pub fn start_update_service(self, context: &RuntimeContext) -> Result<(), String> { // Run an immediate update before starting the updater service. diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index bc820ce44e..87d8ae3135 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -50,10 +50,10 @@ impl From for Error { /// A wrapper around all the items required to spawn the HTTP server. /// /// The server will gracefully handle the case where any fields are `None`. -pub struct Context { +pub struct Context { pub runtime: Weak, pub api_secret: ApiSecret, - pub validator_store: Option>, + pub validator_store: Option>>, pub validator_dir: Option, pub spec: ChainSpec, pub config: Config, @@ -203,7 +203,7 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and_then(|validator_store: ValidatorStore, signer| { + .and_then(|validator_store: Arc>, signer| { blocking_signed_json_task(signer, move || { let validators = validator_store .initialized_validators() @@ -229,7 +229,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(signer.clone()) .and_then( - |validator_pubkey: PublicKey, validator_store: ValidatorStore, signer| { + |validator_pubkey: PublicKey, validator_store: Arc>, signer| { blocking_signed_json_task(signer, move || { let validator = validator_store .initialized_validators() @@ -267,7 +267,7 @@ pub fn serve( .and_then( |body: Vec, validator_dir: PathBuf, - validator_store: ValidatorStore, + validator_store: Arc>, spec: Arc, signer, runtime: Weak| { @@ -309,7 +309,7 @@ pub fn serve( .and_then( |body: api_types::CreateValidatorsMnemonicRequest, validator_dir: PathBuf, - validator_store: ValidatorStore, + validator_store: Arc>, spec: Arc, signer, runtime: Weak| { @@ -353,7 +353,7 @@ pub fn serve( .and_then( |body: api_types::KeystoreValidatorsPostRequest, validator_dir: PathBuf, - validator_store: ValidatorStore, + validator_store: Arc>, signer, runtime: Weak| { blocking_signed_json_task(signer, move || { @@ -428,7 +428,7 @@ pub fn serve( .and_then( |validator_pubkey: PublicKey, body: api_types::ValidatorPatchRequest, - validator_store: ValidatorStore, + validator_store: Arc>, signer, runtime: Weak| { blocking_signed_json_task(signer, move || { diff --git a/validator_client/src/http_api/tests.rs b/validator_client/src/http_api/tests.rs index cf2618bba1..fd8b2b9e73 100644 --- a/validator_client/src/http_api/tests.rs +++ b/validator_client/src/http_api/tests.rs @@ -1,6 +1,7 @@ #![cfg(test)] #![cfg(not(debug_assertions))] +use crate::doppelganger_service::DoppelgangerService; use crate::{ http_api::{ApiSecret, Config as HttpConfig, Context}, Config, ForkServiceBuilder, InitializedValidators, ValidatorDefinitions, ValidatorStore, @@ -85,16 +86,21 @@ impl ApiTester { Hash256::repeat_byte(42), spec, fork_service.clone(), + Some(Arc::new(DoppelgangerService::new(log.clone()))), log.clone(), ); + validator_store + .register_all_in_doppelganger_protection_if_enabled() + .expect("Should attach doppelganger service"); + let initialized_validators = validator_store.initialized_validators(); let context: Arc> = Arc::new(Context { runtime, api_secret, validator_dir: Some(validator_dir.path().into()), - validator_store: Some(validator_store), + validator_store: Some(Arc::new(validator_store)), spec: E::default_spec(), config: HttpConfig { enabled: true, diff --git a/validator_client/src/http_metrics/mod.rs b/validator_client/src/http_metrics/mod.rs index bb80e20f43..fcf98987ad 100644 --- a/validator_client/src/http_metrics/mod.rs +++ b/validator_client/src/http_metrics/mod.rs @@ -35,7 +35,7 @@ impl From for Error { /// Contains objects which have shared access from inside/outside of the metrics server. pub struct Shared { - pub validator_store: Option>, + pub validator_store: Option>>, pub duties_service: Option>>, pub genesis_time: Option, } diff --git a/validator_client/src/initialized_validators.rs b/validator_client/src/initialized_validators.rs index 4962ffdcad..4e9bbef76f 100644 --- a/validator_client/src/initialized_validators.rs +++ b/validator_client/src/initialized_validators.rs @@ -62,6 +62,10 @@ pub enum Error { TokioJoin(tokio::task::JoinError), /// Cannot initialize the same validator twice. DuplicatePublicKey, + /// The public key does not exist in the set of initialized validators. + ValidatorNotInitialized(PublicKey), + /// Unable to read the slot clock. + SlotClock, } impl From for Error { @@ -88,6 +92,8 @@ pub enum SigningMethod { pub struct InitializedValidator { signing_method: SigningMethod, graffiti: Option, + /// The validators index in `state.validators`, to be updated by an external service. + index: Option, } impl InitializedValidator { @@ -212,6 +218,7 @@ impl InitializedValidator { voting_keypair, }, graffiti: def.graffiti.map(Into::into), + index: None, }) } } @@ -313,7 +320,7 @@ impl InitializedValidators { self.definitions.as_slice().len() } - /// Iterate through all **enabled** voting public keys in `self`. + /// Iterate through all voting public keys in `self` that should be used when querying for duties. pub fn iter_voting_pubkeys(&self) -> impl Iterator { self.validators.iter().map(|(pubkey, _)| pubkey) } @@ -622,4 +629,14 @@ impl InitializedValidators { ); Ok(()) } + + pub fn get_index(&self, pubkey: &PublicKeyBytes) -> Option { + self.validators.get(pubkey).and_then(|val| val.index) + } + + pub fn set_index(&mut self, pubkey: &PublicKeyBytes, index: u64) { + if let Some(val) = self.validators.get_mut(pubkey) { + val.index = Some(index); + } + } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index c7b2b3599e..be9a27db7b 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -13,6 +13,7 @@ mod key_cache; mod notifier; mod validator_store; +mod doppelganger_service; pub mod http_api; pub use cli::cli_app; @@ -23,6 +24,7 @@ use monitoring_api::{MonitoringHttpClient, ProcessType}; use crate::beacon_node_fallback::{ start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced, }; +use crate::doppelganger_service::DoppelgangerService; use account_utils::validator_definitions::ValidatorDefinitions; use attestation_service::{AttestationService, AttestationServiceBuilder}; use block_service::{BlockService, BlockServiceBuilder}; @@ -61,9 +63,12 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12); /// This can help ensure that proper endpoint fallback occurs. const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger"; + #[derive(Clone)] pub struct ProductionValidatorClient { context: RuntimeContext, @@ -71,7 +76,8 @@ pub struct ProductionValidatorClient { fork_service: ForkService, block_service: BlockService, attestation_service: AttestationService, - validator_store: ValidatorStore, + doppelganger_service: Option>, + validator_store: Arc>, http_api_listen_addr: Option, http_metrics_ctx: Option>>, config: Config, @@ -254,6 +260,7 @@ impl ProductionValidatorClient { Timeouts { attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT, attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT, + liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT, proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT, proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT, } @@ -313,14 +320,27 @@ impl ProductionValidatorClient { .log(log.clone()) .build()?; - let validator_store: ValidatorStore = ValidatorStore::new( - validators, - slashing_protection, - genesis_validators_root, - context.eth2_config.spec.clone(), - fork_service.clone(), - log.clone(), - ); + let doppelganger_service = if config.enable_doppelganger_protection { + Some(Arc::new(DoppelgangerService::new( + context + .service_context(DOPPELGANGER_SERVICE_NAME.into()) + .log() + .clone(), + ))) + } else { + None + }; + + let validator_store: Arc> = + Arc::new(ValidatorStore::new( + validators, + slashing_protection, + genesis_validators_root, + context.eth2_config.spec.clone(), + fork_service.clone(), + doppelganger_service.clone(), + log.clone(), + )); info!( log, @@ -339,7 +359,6 @@ impl ProductionValidatorClient { let duties_service = Arc::new(DutiesService { attesters: <_>::default(), proposers: <_>::default(), - indices: <_>::default(), slot_clock: slot_clock.clone(), beacon_nodes: beacon_nodes.clone(), validator_store: validator_store.clone(), @@ -369,7 +388,7 @@ impl ProductionValidatorClient { let attestation_service = AttestationServiceBuilder::new() .duties_service(duties_service.clone()) - .slot_clock(slot_clock) + .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) .beacon_nodes(beacon_nodes.clone()) .runtime_context(context.service_context("attestation".into())) @@ -381,12 +400,16 @@ impl ProductionValidatorClient { // of making too many changes this close to genesis (<1 week). wait_for_genesis(&beacon_nodes, genesis_time, &context).await?; + // Ensure all validators are registered in doppelganger protection. + validator_store.register_all_in_doppelganger_protection_if_enabled()?; + Ok(Self { context, duties_service, fork_service, block_service, attestation_service, + doppelganger_service, validator_store, config, http_api_listen_addr: None, @@ -419,6 +442,20 @@ impl ProductionValidatorClient { .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start attestation service: {}", e))?; + if let Some(doppelganger_service) = self.doppelganger_service.clone() { + DoppelgangerService::start_update_service( + doppelganger_service, + self.context + .service_context(DOPPELGANGER_SERVICE_NAME.into()), + self.validator_store.clone(), + self.duties_service.beacon_nodes.clone(), + self.duties_service.slot_clock.clone(), + ) + .map_err(|e| format!("Unable to start doppelganger service: {}", e))? + } else { + info!(log, "Doppelganger protection disabled.") + } + spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?; let api_secret = ApiSecret::create_or_open(&self.config.validator_dir)?; diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index 8ca1d7c5b4..e72bd545da 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -72,6 +72,11 @@ async fn notify( let total_validators = duties_service.total_validator_count(); let proposing_validators = duties_service.proposer_count(epoch); let attesting_validators = duties_service.attester_count(epoch); + let doppelganger_detecting_validators = duties_service.doppelganger_detecting_count(); + + if doppelganger_detecting_validators > 0 { + info!(log, "Searching for doppelgangers on the network"; "doppelganger_detecting_validators" => doppelganger_detecting_validators) + } if total_validators == 0 { info!( diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 96024990e6..54cef8b678 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -1,21 +1,36 @@ use crate::{ - fork_service::ForkService, http_metrics::metrics, initialized_validators::InitializedValidators, + doppelganger_service::DoppelgangerService, fork_service::ForkService, http_metrics::metrics, + initialized_validators::InitializedValidators, }; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; use parking_lot::{Mutex, RwLock}; use slashing_protection::{NotSafe, Safe, SlashingDatabase}; use slog::{crit, error, info, warn, Logger}; use slot_clock::SlotClock; +use std::iter::FromIterator; use std::path::Path; use std::sync::Arc; -use tempfile::TempDir; use types::{ - graffiti::GraffitiString, Attestation, BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, - Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, - SignedBeaconBlock, SignedRoot, Slot, + attestation::Error as AttestationError, graffiti::GraffitiString, Attestation, BeaconBlock, + ChainSpec, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, + SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedRoot, Slot, }; use validator_dir::ValidatorDir; +pub use crate::doppelganger_service::DoppelgangerStatus; + +#[derive(Debug, PartialEq)] +pub enum Error { + DoppelgangerProtected(PublicKeyBytes), + UnknownToDoppelgangerService(PublicKeyBytes), + UnknownPubkey(PublicKeyBytes), + Slashable(NotSafe), + SameData, + GreaterThanCurrentSlot { slot: Slot, current_slot: Slot }, + GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch }, + UnableToSignAttestation(AttestationError), +} + /// Number of epochs of slashing protection history to keep. /// /// This acts as a maximum safe-guard against clock drift. @@ -46,7 +61,6 @@ impl PartialEq for LocalValidator { } } -#[derive(Clone)] pub struct ValidatorStore { validators: Arc>, slashing_protection: SlashingDatabase, @@ -54,8 +68,9 @@ pub struct ValidatorStore { genesis_validators_root: Hash256, spec: Arc, log: Logger, - temp_dir: Option>, + doppelganger_service: Option>, fork_service: ForkService, + slot_clock: T, } impl ValidatorStore { @@ -65,6 +80,7 @@ impl ValidatorStore { genesis_validators_root: Hash256, spec: ChainSpec, fork_service: ForkService, + doppelganger_service: Option>, log: Logger, ) -> Self { Self { @@ -73,12 +89,32 @@ impl ValidatorStore { slashing_protection_last_prune: Arc::new(Mutex::new(Epoch::new(0))), genesis_validators_root, spec: Arc::new(spec), - log, - temp_dir: None, + log: log.clone(), + doppelganger_service, + slot_clock: fork_service.slot_clock(), fork_service, } } + /// Register all local validators in doppelganger protection to try and prevent instances of + /// duplicate validators operating on the network at the same time. + /// + /// This function has no effect if doppelganger protection is disabled. + pub fn register_all_in_doppelganger_protection_if_enabled(&self) -> Result<(), String> { + if let Some(doppelganger_service) = &self.doppelganger_service { + for pubkey in self.validators.read().iter_voting_pubkeys() { + doppelganger_service.register_new_validator::(*pubkey, &self.slot_clock)? + } + } + + Ok(()) + } + + /// Returns `true` if doppelganger protection is enabled, or else `false`. + pub fn doppelganger_protection_enabled(&self) -> bool { + self.doppelganger_service.is_some() + } + pub fn initialized_validators(&self) -> Arc> { self.validators.clone() } @@ -105,12 +141,19 @@ impl ValidatorStore { ) .map_err(|e| format!("failed to create validator definitions: {:?}", e))?; + let validator_pubkey = validator_def.voting_public_key.compress(); + self.slashing_protection - .register_validator(validator_def.voting_public_key.compress()) + .register_validator(validator_pubkey) .map_err(|e| format!("failed to register validator: {:?}", e))?; validator_def.enabled = enable; + if let Some(doppelganger_service) = &self.doppelganger_service { + doppelganger_service + .register_new_validator::(validator_pubkey, &self.slot_clock)?; + } + self.validators .write() .add_definition(validator_def.clone()) @@ -120,14 +163,92 @@ impl ValidatorStore { Ok(validator_def) } - pub fn voting_pubkeys(&self) -> Vec { - self.validators + /// Attempts to resolve the pubkey to a validator index. + /// + /// It may return `None` if the `pubkey` is: + /// + /// - Unknown. + /// - Known, but with an unknown index. + pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option { + self.validators.read().get_index(pubkey) + } + + /// Returns all voting pubkeys for all enabled validators. + /// + /// The `filter_func` allows for filtering pubkeys based upon their `DoppelgangerStatus`. There + /// are two primary functions used here: + /// + /// - `DoppelgangerStatus::only_safe`: only returns pubkeys which have passed doppelganger + /// protection and are safe-enough to sign messages. + /// - `DoppelgangerStatus::ignored`: returns all the pubkeys from `only_safe` *plus* those still + /// undergoing protection. This is useful for collecting duties or other non-signing tasks. + #[allow(clippy::needless_collect)] // Collect is required to avoid holding a lock. + pub fn voting_pubkeys(&self, filter_func: F) -> I + where + I: FromIterator, + F: Fn(DoppelgangerStatus) -> Option, + { + // Collect all the pubkeys first to avoid interleaving locks on `self.validators` and + // `self.doppelganger_service()`. + let pubkeys = self + .validators .read() .iter_voting_pubkeys() .cloned() + .collect::>(); + + pubkeys + .into_iter() + .map(|pubkey| { + self.doppelganger_service + .as_ref() + .map(|doppelganger_service| doppelganger_service.validator_status(pubkey)) + // Allow signing on all pubkeys if doppelganger protection is disabled. + .unwrap_or_else(|| DoppelgangerStatus::SigningEnabled(pubkey)) + }) + .filter_map(filter_func) .collect() } + /// Returns doppelganger statuses for all enabled validators. + #[allow(clippy::needless_collect)] // Collect is required to avoid holding a lock. + pub fn doppelganger_statuses(&self) -> Vec { + // Collect all the pubkeys first to avoid interleaving locks on `self.validators` and + // `self.doppelganger_service`. + let pubkeys = self + .validators + .read() + .iter_voting_pubkeys() + .cloned() + .collect::>(); + + pubkeys + .into_iter() + .map(|pubkey| { + self.doppelganger_service + .as_ref() + .map(|doppelganger_service| doppelganger_service.validator_status(pubkey)) + // Allow signing on all pubkeys if doppelganger protection is disabled. + .unwrap_or_else(|| DoppelgangerStatus::SigningEnabled(pubkey)) + }) + .collect() + } + + /// Check if the `validator_pubkey` is permitted by the doppleganger protection to sign + /// messages. + pub fn doppelganger_protection_allows_signing(&self, validator_pubkey: PublicKeyBytes) -> bool { + self.doppelganger_service + .as_ref() + // If there's no doppelganger service then we assume it is purposefully disabled and + // declare that all keys are safe with regard to it. + .map_or(true, |doppelganger_service| { + doppelganger_service + .validator_status(validator_pubkey) + .only_safe() + .is_some() + }) + } + pub fn num_voting_validators(&self) -> usize { self.validators.read().num_enabled() } @@ -136,25 +257,56 @@ impl ValidatorStore { self.fork_service.fork() } + /// Runs `func`, providing it access to the `Keypair` corresponding to `validator_pubkey`. + /// + /// This forms the canonical point for accessing the secret key of some validator. It is + /// structured as a `with_...` function since we need to pass-through a read-lock in order to + /// access the keypair. + /// + /// Access to keypairs might be restricted by other internal mechanisms (e.g., doppleganger + /// protection). + /// + /// ## Warning + /// + /// This function takes a read-lock on `self.validators`. To prevent deadlocks, it is advised to + /// never take any sort of concurrency lock inside this function. + fn with_validator_keypair( + &self, + validator_pubkey: PublicKeyBytes, + func: F, + ) -> Result + where + F: FnOnce(&Keypair) -> R, + { + // If the doppelganger service is active, check to ensure it explicitly permits signing by + // this validator. + if !self.doppelganger_protection_allows_signing(validator_pubkey) { + return Err(Error::DoppelgangerProtected(validator_pubkey)); + } + + let validators_lock = self.validators.read(); + + Ok(func( + validators_lock + .voting_keypair(&validator_pubkey) + .ok_or(Error::UnknownPubkey(validator_pubkey))?, + )) + } + pub fn randao_reveal( &self, - validator_pubkey: &PublicKeyBytes, + validator_pubkey: PublicKeyBytes, epoch: Epoch, - ) -> Option { - self.validators - .read() - .voting_keypair(validator_pubkey) - .map(|voting_keypair| { - let domain = self.spec.get_domain( - epoch, - Domain::Randao, - &self.fork(), - self.genesis_validators_root, - ); - let message = epoch.signing_root(domain); + ) -> Result { + let domain = self.spec.get_domain( + epoch, + Domain::Randao, + &self.fork(), + self.genesis_validators_root, + ); + let message = epoch.signing_root(domain); - voting_keypair.sk.sign(message) - }) + self.with_validator_keypair(validator_pubkey, |keypair| keypair.sk.sign(message)) } pub fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option { @@ -163,10 +315,10 @@ impl ValidatorStore { pub fn sign_block( &self, - validator_pubkey: &PublicKeyBytes, + validator_pubkey: PublicKeyBytes, block: BeaconBlock, current_slot: Slot, - ) -> Option> { + ) -> Result, Error> { // Make sure the block slot is not higher than the current slot to avoid potential attacks. if block.slot() > current_slot { warn!( @@ -175,7 +327,10 @@ impl ValidatorStore { "block_slot" => block.slot().as_u64(), "current_slot" => current_slot.as_u64() ); - return None; + return Err(Error::GreaterThanCurrentSlot { + slot: block.slot(), + current_slot, + }); } // Check for slashing conditions. @@ -188,25 +343,19 @@ impl ValidatorStore { ); let slashing_status = self.slashing_protection.check_and_insert_block_proposal( - validator_pubkey, + &validator_pubkey, &block.block_header(), domain, ); match slashing_status { - // We can safely sign this block. + // We can safely sign this block without slashing. Ok(Safe::Valid) => { - let validators = self.validators.read(); - let voting_keypair = validators.voting_keypair(validator_pubkey)?; - metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SUCCESS]); - Some(block.sign( - &voting_keypair.sk, - &fork, - self.genesis_validators_root, - &self.spec, - )) + self.with_validator_keypair(validator_pubkey, move |keypair| { + block.sign(&keypair.sk, &fork, self.genesis_validators_root, &self.spec) + }) } Ok(Safe::SameData) => { warn!( @@ -214,7 +363,7 @@ impl ValidatorStore { "Skipping signing of previously signed block"; ); metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SAME_DATA]); - None + Err(Error::SameData) } Err(NotSafe::UnregisteredValidator(pk)) => { warn!( @@ -224,7 +373,7 @@ impl ValidatorStore { "public_key" => format!("{:?}", pk) ); metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::UNREGISTERED]); - None + Err(Error::Slashable(NotSafe::UnregisteredValidator(pk))) } Err(e) => { crit!( @@ -233,21 +382,24 @@ impl ValidatorStore { "error" => format!("{:?}", e) ); metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SLASHABLE]); - None + Err(Error::Slashable(e)) } } } pub fn sign_attestation( &self, - validator_pubkey: &PublicKeyBytes, + validator_pubkey: PublicKeyBytes, validator_committee_position: usize, attestation: &mut Attestation, current_epoch: Epoch, - ) -> Option<()> { + ) -> Result<(), Error> { // Make sure the target epoch is not higher than the current epoch to avoid potential attacks. if attestation.data.target.epoch > current_epoch { - return None; + return Err(Error::GreaterThanCurrentEpoch { + epoch: attestation.data.target.epoch, + current_epoch, + }); } // Checking for slashing conditions. @@ -260,7 +412,7 @@ impl ValidatorStore { self.genesis_validators_root, ); let slashing_status = self.slashing_protection.check_and_insert_attestation( - validator_pubkey, + &validator_pubkey, &attestation.data, domain, ); @@ -268,29 +420,20 @@ impl ValidatorStore { match slashing_status { // We can safely sign this attestation. Ok(Safe::Valid) => { - let validators = self.validators.read(); - let voting_keypair = validators.voting_keypair(validator_pubkey)?; - - attestation - .sign( - &voting_keypair.sk, + self.with_validator_keypair(validator_pubkey, |keypair| { + attestation.sign( + &keypair.sk, validator_committee_position, &fork, self.genesis_validators_root, &self.spec, ) - .map_err(|e| { - error!( - self.log, - "Error whilst signing attestation"; - "error" => format!("{:?}", e) - ) - }) - .ok()?; + })? + .map_err(Error::UnableToSignAttestation)?; metrics::inc_counter_vec(&metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SUCCESS]); - Some(()) + Ok(()) } Ok(Safe::SameData) => { warn!( @@ -301,7 +444,7 @@ impl ValidatorStore { &metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SAME_DATA], ); - None + Err(Error::SameData) } Err(NotSafe::UnregisteredValidator(pk)) => { warn!( @@ -314,7 +457,7 @@ impl ValidatorStore { &metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::UNREGISTERED], ); - None + Err(Error::Slashable(NotSafe::UnregisteredValidator(pk))) } Err(e) => { crit!( @@ -327,7 +470,7 @@ impl ValidatorStore { &metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SLASHABLE], ); - None + Err(Error::Slashable(e)) } } } @@ -338,46 +481,64 @@ impl ValidatorStore { /// modified by actors other than the signing validator. pub fn produce_signed_aggregate_and_proof( &self, - validator_pubkey: &PublicKeyBytes, + validator_pubkey: PublicKeyBytes, validator_index: u64, aggregate: Attestation, selection_proof: SelectionProof, - ) -> Option> { - let validators = self.validators.read(); - let voting_keypair = &validators.voting_keypair(validator_pubkey)?; + ) -> Result, Error> { + // Take the fork early to avoid lock interleaving. + let fork = self.fork(); + + let proof = self.with_validator_keypair(validator_pubkey, move |keypair| { + SignedAggregateAndProof::from_aggregate( + validator_index, + aggregate, + Some(selection_proof), + &keypair.sk, + &fork, + self.genesis_validators_root, + &self.spec, + ) + })?; metrics::inc_counter_vec(&metrics::SIGNED_AGGREGATES_TOTAL, &[metrics::SUCCESS]); - Some(SignedAggregateAndProof::from_aggregate( - validator_index, - aggregate, - Some(selection_proof), - &voting_keypair.sk, - &self.fork(), - self.genesis_validators_root, - &self.spec, - )) + Ok(proof) } /// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to /// `validator_pubkey`. pub fn produce_selection_proof( &self, - validator_pubkey: &PublicKeyBytes, + validator_pubkey: PublicKeyBytes, slot: Slot, - ) -> Option { - let validators = self.validators.read(); - let voting_keypair = &validators.voting_keypair(validator_pubkey)?; + ) -> Result { + // Take the fork early to avoid lock interleaving. + let fork = self.fork(); + + // Bypass the `with_validator_keypair` function. + // + // This is because we don't care about doppelganger protection when it comes to selection + // proofs. They are not slashable and we need them to subscribe to subnets on the BN. + // + // As long as we disallow `SignedAggregateAndProof` then these selection proofs will never + // be published on the network. + let validators_lock = self.validators.read(); + let keypair = validators_lock + .voting_keypair(&validator_pubkey) + .ok_or(Error::UnknownPubkey(validator_pubkey))?; + + let proof = SelectionProof::new::( + slot, + &keypair.sk, + &fork, + self.genesis_validators_root, + &self.spec, + ); metrics::inc_counter_vec(&metrics::SIGNED_SELECTION_PROOFS_TOTAL, &[metrics::SUCCESS]); - Some(SelectionProof::new::( - slot, - &voting_keypair.sk, - &self.fork(), - self.genesis_validators_root, - &self.spec, - )) + Ok(proof) } /// Prune the slashing protection database so that it remains performant. @@ -411,10 +572,11 @@ impl ValidatorStore { let new_min_target_epoch = current_epoch.saturating_sub(SLASHING_PROTECTION_HISTORY_EPOCHS); let new_min_slot = new_min_target_epoch.start_slot(E::slots_per_epoch()); - let validators = self.validators.read(); + let all_pubkeys: Vec<_> = self.voting_pubkeys(DoppelgangerStatus::ignored); + if let Err(e) = self .slashing_protection - .prune_all_signed_attestations(validators.iter_voting_pubkeys(), new_min_target_epoch) + .prune_all_signed_attestations(all_pubkeys.iter(), new_min_target_epoch) { error!( self.log, @@ -426,7 +588,7 @@ impl ValidatorStore { if let Err(e) = self .slashing_protection - .prune_all_signed_blocks(validators.iter_voting_pubkeys(), new_min_slot) + .prune_all_signed_blocks(all_pubkeys.iter(), new_min_slot) { error!( self.log,