diff --git a/beacon_node/beacon_chain/src/payload_bid_verification/tests.rs b/beacon_node/beacon_chain/src/payload_bid_verification/tests.rs index b7b77d5d2a..c68e6d9d32 100644 --- a/beacon_node/beacon_chain/src/payload_bid_verification/tests.rs +++ b/beacon_node/beacon_chain/src/payload_bid_verification/tests.rs @@ -252,11 +252,11 @@ fn make_signed_preferences( ) -> Arc { Arc::new(SignedProposerPreferences { message: ProposerPreferences { + dependent_root: Hash256::ZERO, proposal_slot, validator_index, fee_recipient, gas_limit, - ..ProposerPreferences::default() }, signature: Signature::empty(), }) diff --git a/beacon_node/beacon_chain/src/proposer_preferences_verification/gossip_verified_proposer_preferences.rs b/beacon_node/beacon_chain/src/proposer_preferences_verification/gossip_verified_proposer_preferences.rs index cc77453c49..4ba33fde72 100644 --- a/beacon_node/beacon_chain/src/proposer_preferences_verification/gossip_verified_proposer_preferences.rs +++ b/beacon_node/beacon_chain/src/proposer_preferences_verification/gossip_verified_proposer_preferences.rs @@ -154,7 +154,9 @@ impl BeaconChain { #[cfg(test)] mod tests { - use types::{Address, BeaconState, EthSpec, MinimalEthSpec, ProposerPreferences, Slot}; + use types::{ + Address, BeaconState, EthSpec, Hash256, MinimalEthSpec, ProposerPreferences, Slot, + }; use super::verify_preferences_consistency; use crate::proposer_preferences_verification::ProposerPreferencesError; @@ -163,7 +165,7 @@ mod tests { fn make_preferences(proposal_slot: Slot, validator_index: u64) -> ProposerPreferences { ProposerPreferences { - dependent_root: types::Hash256::ZERO, + dependent_root: Hash256::ZERO, proposal_slot, validator_index, fee_recipient: Address::ZERO, diff --git a/beacon_node/beacon_chain/src/proposer_preferences_verification/proposer_preference_cache.rs b/beacon_node/beacon_chain/src/proposer_preferences_verification/proposer_preference_cache.rs index 507e61dc10..7bbdf34888 100644 --- a/beacon_node/beacon_chain/src/proposer_preferences_verification/proposer_preference_cache.rs +++ b/beacon_node/beacon_chain/src/proposer_preferences_verification/proposer_preference_cache.rs @@ -70,20 +70,24 @@ mod tests { use std::sync::Arc; use bls::Signature; - use types::{Address, ProposerPreferences, SignedProposerPreferences, Slot}; + use types::{Address, Hash256, ProposerPreferences, SignedProposerPreferences, Slot}; use super::GossipVerifiedProposerPreferenceCache; use crate::proposer_preferences_verification::gossip_verified_proposer_preferences::GossipVerifiedProposerPreferences; - fn make_gossip_verified(slot: Slot, validator_index: u64) -> GossipVerifiedProposerPreferences { + fn make_gossip_verified( + slot: Slot, + validator_index: u64, + dependent_root: Hash256, + ) -> GossipVerifiedProposerPreferences { GossipVerifiedProposerPreferences { signed_preferences: Arc::new(SignedProposerPreferences { message: ProposerPreferences { + dependent_root, proposal_slot: slot, validator_index, fee_recipient: Address::ZERO, gas_limit: 30_000_000, - ..ProposerPreferences::default() }, signature: Signature::empty(), }), @@ -93,9 +97,10 @@ mod tests { #[test] fn prune_removes_old_retains_current() { let cache = GossipVerifiedProposerPreferenceCache::default(); + let root = Hash256::ZERO; for slot in [1, 2, 3, 7, 8, 9, 10] { - let verified = make_gossip_verified(Slot::new(slot), slot); + let verified = make_gossip_verified(Slot::new(slot), slot, root); cache.insert_seen_validator(&verified); cache.insert_preferences(verified); } @@ -104,11 +109,26 @@ mod tests { for slot in [1, 2, 3, 7] { assert!(cache.get_preferences(&Slot::new(slot)).is_none()); - assert!(!cache.get_seen_validator(&Slot::new(slot), types::Hash256::ZERO, slot)); + assert!(!cache.get_seen_validator(&Slot::new(slot), root, slot)); } for slot in [8, 9, 10] { assert!(cache.get_preferences(&Slot::new(slot)).is_some()); - assert!(cache.get_seen_validator(&Slot::new(slot), types::Hash256::ZERO, slot)); + assert!(cache.get_seen_validator(&Slot::new(slot), root, slot)); } } + + #[test] + fn different_dependent_roots_not_deduped() { + let cache = GossipVerifiedProposerPreferenceCache::default(); + let slot = Slot::new(5); + let root_a = Hash256::repeat_byte(0xaa); + let root_b = Hash256::repeat_byte(0xbb); + let validator_index = 42; + + let verified_a = make_gossip_verified(slot, validator_index, root_a); + cache.insert_seen_validator(&verified_a); + + assert!(cache.get_seen_validator(&slot, root_a, validator_index)); + assert!(!cache.get_seen_validator(&slot, root_b, validator_index)); + } } diff --git a/beacon_node/beacon_chain/src/proposer_preferences_verification/tests.rs b/beacon_node/beacon_chain/src/proposer_preferences_verification/tests.rs index ce2ea12bb5..468e08ff3b 100644 --- a/beacon_node/beacon_chain/src/proposer_preferences_verification/tests.rs +++ b/beacon_node/beacon_chain/src/proposer_preferences_verification/tests.rs @@ -127,11 +127,11 @@ fn make_signed_preferences( ) -> Arc { Arc::new(SignedProposerPreferences { message: ProposerPreferences { + dependent_root: Hash256::ZERO, proposal_slot, validator_index, fee_recipient: Address::ZERO, gas_limit: 30_000_000, - ..ProposerPreferences::default() }, signature: Signature::empty(), }) @@ -231,11 +231,10 @@ fn correct_proposer_bad_signature() { result, Err(ProposerPreferencesError::BadSignature) )); - assert!(!ctx.preferences_cache.get_seen_validator( - &slot, - types::Hash256::ZERO, - actual_proposer - )); + assert!( + !ctx.preferences_cache + .get_seen_validator(&slot, Hash256::ZERO, actual_proposer) + ); assert!(ctx.preferences_cache.get_preferences(&slot).is_none()); } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b2d069f384..f31817c5ba 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1490,7 +1490,7 @@ pub fn serve( // POST beacon/pool/payload_attestations let post_beacon_pool_payload_attestations = post_beacon_pool_payload_attestations( &network_tx_filter, - optional_consensus_version_header_filter, + optional_consensus_version_header_filter.clone(), &beacon_pool_path, ); @@ -1510,6 +1510,22 @@ pub fn serve( let post_beacon_pool_bls_to_execution_changes = post_beacon_pool_bls_to_execution_changes(&network_tx_filter, &beacon_pool_path); + // POST validator/proposer_preferences (JSON) + let post_validator_proposer_preferences = post_validator_proposer_preferences( + eth_v1.clone(), + task_spawner_filter.clone(), + chain_filter.clone(), + network_tx_filter.clone(), + ); + + // POST validator/proposer_preferences (SSZ) + let post_validator_proposer_preferences_ssz = post_validator_proposer_preferences_ssz( + eth_v1.clone(), + task_spawner_filter.clone(), + chain_filter.clone(), + network_tx_filter.clone(), + ); + // POST beacon/execution_payload_envelope let post_beacon_execution_payload_envelope = post_beacon_execution_payload_envelope( eth_v1.clone(), @@ -3416,7 +3432,8 @@ pub fn serve( .uor(post_beacon_blinded_blocks_ssz) .uor(post_beacon_blinded_blocks_v2_ssz) .uor(post_beacon_execution_payload_envelope_ssz) - .uor(post_beacon_pool_payload_attestations_ssz), + .uor(post_beacon_pool_payload_attestations_ssz) + .uor(post_validator_proposer_preferences_ssz), ) .uor(post_beacon_blocks) .uor(post_beacon_blinded_blocks) @@ -3429,6 +3446,7 @@ pub fn serve( .uor(post_beacon_pool_sync_committees) .uor(post_beacon_pool_payload_attestations) .uor(post_beacon_pool_bls_to_execution_changes) + .uor(post_validator_proposer_preferences) .uor(post_beacon_execution_payload_envelope) .uor(post_beacon_state_validators) .uor(post_beacon_state_validator_balances) diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs index 27fe5de6e7..044f2089ce 100644 --- a/beacon_node/http_api/src/validator/mod.rs +++ b/beacon_node/http_api/src/validator/mod.rs @@ -9,8 +9,11 @@ use crate::utils::{ use crate::version::{V1, V2, V3, unsupported_version_rejection}; use crate::{StateId, attester_duties, proposer_duties, ptc_duties, sync_committees}; use beacon_chain::attestation_verification::VerifiedAttestation; +use beacon_chain::proposer_preferences_verification::ProposerPreferencesError; use beacon_chain::{AttestationError, BeaconChain, BeaconChainError, BeaconChainTypes}; use bls::PublicKeyBytes; +use bytes::Bytes; +use eth2::CONSENSUS_VERSION_HEADER; use eth2::types::{ Accept, BeaconCommitteeSubscription, EndpointVersion, Failure, GenericResponse, StandardLivenessResponseData, StateId as CoreStateId, ValidatorAggregateAttestationQuery, @@ -20,14 +23,15 @@ use lighthouse_network::PubsubMessage; use network::{NetworkMessage, ValidatorSubscriptionMessage}; use reqwest::StatusCode; use slot_clock::SlotClock; +use ssz::Decode; use std::sync::Arc; use tokio::sync::mpsc::{Sender, UnboundedSender}; use tokio::sync::oneshot; use tracing::{debug, error, info, warn}; use types::{ - BeaconState, Epoch, EthSpec, ProposerPreparationData, SignedAggregateAndProof, - SignedContributionAndProof, SignedValidatorRegistrationData, Slot, SyncContributionData, - ValidatorSubscription, + BeaconState, Epoch, EthSpec, ForkName, ProposerPreparationData, SignedAggregateAndProof, + SignedContributionAndProof, SignedProposerPreferences, SignedValidatorRegistrationData, Slot, + SyncContributionData, ValidatorSubscription, }; use warp::{Filter, Rejection, Reply}; use warp_utils::reject::convert_rejection; @@ -1144,3 +1148,117 @@ pub fn get_validator_duties_proposer( ) .boxed() } + +/// POST validator/proposer_preferences (JSON) +pub fn post_validator_proposer_preferences( + eth_v1: EthV1Filter, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("proposer_preferences")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(warp::header::(CONSENSUS_VERSION_HEADER)) + .and(task_spawner_filter) + .and(chain_filter) + .and(network_tx_filter) + .then( + |preferences: Vec, + _fork_name: ForkName, + task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>| { + task_spawner.blocking_response_task(Priority::P0, move || { + publish_proposer_preferences(&chain, &network_tx, preferences)?; + Ok(warp::reply()) + }) + }, + ) + .boxed() +} + +/// POST validator/proposer_preferences (SSZ) +pub fn post_validator_proposer_preferences_ssz( + eth_v1: EthV1Filter, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("proposer_preferences")) + .and(warp::path::end()) + .and(warp::body::bytes()) + .and(warp::header::(CONSENSUS_VERSION_HEADER)) + .and(task_spawner_filter) + .and(chain_filter) + .and(network_tx_filter) + .then( + |body_bytes: Bytes, + _fork_name: ForkName, + task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>| { + task_spawner.blocking_response_task(Priority::P0, move || { + let preferences = Vec::::from_ssz_bytes(&body_bytes) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}")) + })?; + publish_proposer_preferences(&chain, &network_tx, preferences)?; + Ok(warp::reply()) + }) + }, + ) + .boxed() +} + +fn publish_proposer_preferences( + chain: &BeaconChain, + network_tx: &UnboundedSender>, + preferences_list: Vec, +) -> Result<(), warp::Rejection> { + let mut failures = vec![]; + let mut num_already_known = 0; + + for (index, preferences) in preferences_list.into_iter().enumerate() { + let validator_index = preferences.message.validator_index; + match chain.verify_proposer_preferences_for_gossip(Arc::new(preferences)) { + Ok(verified) => { + crate::utils::publish_pubsub_message( + network_tx, + PubsubMessage::ProposerPreferences(verified.signed_preferences), + )?; + } + Err(ProposerPreferencesError::AlreadySeen { .. }) => { + num_already_known += 1; + } + Err(e) => { + error!( + error = ?e, + %validator_index, + "Failure verifying proposer preferences for gossip" + ); + failures.push(Failure::new(index, format!("{e:?}"))); + } + } + } + + if num_already_known > 0 { + debug!( + count = num_already_known, + "Some proposer preferences already known" + ); + } + + if failures.is_empty() { + Ok(()) + } else { + Err(warp_utils::reject::indexed_bad_request( + "error processing proposer preferences".to_string(), + failures, + )) + } +} diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 7d351e9331..d6c621f996 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -48,9 +48,10 @@ use tokio::time::Duration; use tree_hash::TreeHash; use types::ApplicationDomain; use types::{ - Domain, EthSpec, ExecutionBlockHash, Hash256, MainnetEthSpec, RelativeEpoch, SelectionProof, - SignedExecutionPayloadEnvelope, SignedRoot, SingleAttestation, Slot, - attestation::AttestationBase, consts::gloas::BUILDER_INDEX_SELF_BUILD, + Address, Domain, EthSpec, ExecutionBlockHash, Hash256, MainnetEthSpec, ProposerPreferences, + RelativeEpoch, SelectionProof, SignedExecutionPayloadEnvelope, SignedProposerPreferences, + SignedRoot, SingleAttestation, Slot, attestation::AttestationBase, + consts::gloas::BUILDER_INDEX_SELF_BUILD, }; type E = MainnetEthSpec; @@ -2898,6 +2899,162 @@ impl ApiTester { self } + fn make_valid_signed_proposer_preferences( + &self, + slot_offset: usize, + ) -> SignedProposerPreferences { + let head = self.chain.head_snapshot(); + let head_slot = head.beacon_block.slot(); + let head_state = &head.beacon_state; + let genesis_validators_root = self.chain.genesis_validators_root; + + let proposer_lookahead = head_state + .proposer_lookahead() + .expect("should get proposer_lookahead"); + + // Pick a future slot in the next epoch to ensure it's always valid. + // The lookahead covers 2 epochs: index = epoch_offset * slots_per_epoch + slot_in_epoch. + let slots_per_epoch = E::slots_per_epoch() as usize; + let next_epoch = head_slot.epoch(E::slots_per_epoch()) + 1; + let next_epoch_start = next_epoch.start_slot(E::slots_per_epoch()); + let proposal_slot = next_epoch_start + Slot::new((slot_offset % slots_per_epoch) as u64); + + let lookahead_index = slots_per_epoch + (slot_offset % slots_per_epoch); + let validator_index = *proposer_lookahead + .get(lookahead_index) + .expect("slot index should be in lookahead") as usize; + + let preferences = ProposerPreferences { + dependent_root: Hash256::ZERO, + proposal_slot, + validator_index: validator_index as u64, + fee_recipient: Address::repeat_byte(0xaa), + gas_limit: 30_000_000, + }; + + let epoch = proposal_slot.epoch(E::slots_per_epoch()); + let fork = head_state.fork(); + let domain = self.chain.spec.get_domain( + epoch, + Domain::ProposerPreferences, + &fork, + genesis_validators_root, + ); + let signing_root = preferences.signing_root(domain); + let sk = &self.validator_keypairs()[validator_index].sk; + let signature = sk.sign(signing_root); + + SignedProposerPreferences { + message: preferences, + signature, + } + } + + // Each sub-test uses a unique slot_offset (1-5) because the gossip cache deduplicates on + // (slot, dependent_root, validator_index). Reusing an offset from an earlier test would hit + // "already seen" instead of testing the intended condition. + pub async fn test_post_validator_proposer_preferences_valid(mut self) -> Self { + let signed = self.make_valid_signed_proposer_preferences(1); + let fork_name = self + .chain + .spec + .fork_name_at_slot::(signed.message.proposal_slot); + + self.client + .post_validator_proposer_preferences(&[signed], fork_name) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid proposer preferences should be sent to network" + ); + + self + } + + pub async fn test_post_validator_proposer_preferences_valid_ssz(mut self) -> Self { + let signed = self.make_valid_signed_proposer_preferences(2); + let fork_name = self + .chain + .spec + .fork_name_at_slot::(signed.message.proposal_slot); + + self.client + .post_validator_proposer_preferences_ssz(&vec![signed], fork_name) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid proposer preferences (SSZ) should be sent to network" + ); + + self + } + + pub async fn test_post_validator_proposer_preferences_invalid_sig(self) -> Self { + let mut signed = self.make_valid_signed_proposer_preferences(3); + signed.signature = Signature::empty(); + let fork_name = self + .chain + .spec + .fork_name_at_slot::(signed.message.proposal_slot); + + let result = self + .client + .post_validator_proposer_preferences(&[signed], fork_name) + .await; + + assert!(result.is_err(), "invalid signature should be rejected"); + + self + } + + pub async fn test_post_validator_proposer_preferences_invalid_sig_ssz(self) -> Self { + let mut signed = self.make_valid_signed_proposer_preferences(4); + signed.signature = Signature::empty(); + let fork_name = self + .chain + .spec + .fork_name_at_slot::(signed.message.proposal_slot); + + let result = self + .client + .post_validator_proposer_preferences_ssz(&vec![signed], fork_name) + .await; + + assert!( + result.is_err(), + "invalid signature should be rejected via SSZ route" + ); + + self + } + + pub async fn test_post_validator_proposer_preferences_duplicate(mut self) -> Self { + let signed = self.make_valid_signed_proposer_preferences(5); + let fork_name = self + .chain + .spec + .fork_name_at_slot::(signed.message.proposal_slot); + + // First submission should succeed. + self.client + .post_validator_proposer_preferences(std::slice::from_ref(&signed), fork_name) + .await + .unwrap(); + self.network_rx.network_recv.recv().await; + + // Second submission of the same preferences should return 200 (already known, not an error). + self.client + .post_validator_proposer_preferences(&[signed], fork_name) + .await + .unwrap(); + + self + } + pub async fn test_get_config_fork_schedule(self) -> Self { let result = self.client.get_config_fork_schedule().await.unwrap().data; @@ -9199,3 +9356,22 @@ async fn get_validator_blocks_v3_http_api_path() { .get_validator_blocks_v3_path_graffiti_policy() .await; } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn post_validator_proposer_preferences() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new_with_hard_forks() + .await + .test_post_validator_proposer_preferences_valid() + .await + .test_post_validator_proposer_preferences_valid_ssz() + .await + .test_post_validator_proposer_preferences_invalid_sig() + .await + .test_post_validator_proposer_preferences_invalid_sig_ssz() + .await + .test_post_validator_proposer_preferences_duplicate() + .await; +} diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 9875d4b0c4..e5a703ff1e 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -51,7 +51,7 @@ pub enum PubsubMessage { /// Gossipsub message providing notification of a signed execution payload bid. ExecutionPayloadBid(Box>), /// Gossipsub message providing notification of signed proposer preferences. - ProposerPreferences(Box), + ProposerPreferences(Arc), /// Gossipsub message providing notification of a light client finality update. LightClientFinalityUpdate(Box>), /// Gossipsub message providing notification of a light client optimistic update. @@ -388,7 +388,7 @@ impl PubsubMessage { GossipKind::ProposerPreferences => { let proposer_preferences = SignedProposerPreferences::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::ProposerPreferences(Box::new( + Ok(PubsubMessage::ProposerPreferences(Arc::new( proposer_preferences, ))) } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index b11e78bd9b..adbd2a426c 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -526,15 +526,11 @@ impl NetworkBeaconProcessor { self: &Arc, message_id: MessageId, peer_id: PeerId, - proposer_preferences: Box, + proposer_preferences: Arc, ) -> Result<(), Error> { let processor = self.clone(); let process_fn = move || { - processor.process_gossip_proposer_preferences( - message_id, - peer_id, - Arc::new(*proposer_preferences), - ) + processor.process_gossip_proposer_preferences(message_id, peer_id, proposer_preferences) }; self.try_send(BeaconWorkEvent { diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 443fa51cc6..5326778794 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -26,6 +26,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, trace, warn}; use types::{ BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, }; /// Handles messages from the network and routes them to the appropriate service to be handled. @@ -341,10 +342,19 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } - // TODO(EIP-7732): implement outgoing payload envelopes by range and root - // responses once sync manager requests them. - Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by root and by range not supported yet"); + Response::PayloadEnvelopesByRoot(payload_envelope) => { + self.on_payload_envelopes_by_root_response( + peer_id, + app_request_id, + payload_envelope, + ); + } + Response::PayloadEnvelopesByRange(payload_envelope) => { + self.on_payload_envelopes_by_range_response( + peer_id, + app_request_id, + payload_envelope, + ); } // Light client responses should not be received Response::LightClientBootstrap(_) @@ -809,6 +819,62 @@ impl Router { } } + pub fn on_payload_envelopes_by_root_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + payload_envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::SinglePayloadEnvelope { .. } => id, + other => { + crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request"); + return; + } + }, + AppRequestId::Router => { + crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync"); + return; + } + AppRequestId::Internal => unreachable!("Handled internally"), + }; + + trace!( + %peer_id, + "Received PayloadEnvelopesByRoot Response" + ); + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + payload_envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } + + pub fn on_payload_envelopes_by_range_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + payload_envelope: Option>>, + ) { + trace!( + %peer_id, + "Received PayloadEnvelopesByRange Response" + ); + + if let AppRequestId::Sync(sync_request_id) = app_request_id { + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + peer_id, + sync_request_id, + payload_envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } else { + crit!("All payload envelopes by range responses should belong to sync"); + } + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index bd59e1df37..e5b2acdcac 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -134,6 +134,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// A payload envelope has been received from the RPC. + RpcPayloadEnvelope { + sync_request_id: SyncRequestId, + peer_id: PeerId, + payload_envelope: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), @@ -853,6 +861,17 @@ impl SyncManager { } => { self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp) } + SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + payload_envelope, + seen_timestamp, + } => self.rpc_payload_envelope_received( + sync_request_id, + peer_id, + payload_envelope, + seen_timestamp, + ), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -1242,6 +1261,34 @@ impl SyncManager { } } + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + payload_envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_envelope_response( + id, + peer_id, + RpcEvent::from_chunk(payload_envelope, seen_timestamp), + ); + } + SyncRequestId::PayloadEnvelopesByRange(req_id) => { + self.on_payload_envelopes_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(payload_envelope, seen_timestamp), + ); + } + _ => { + crit!(%peer_id, "bad request id for payload_envelope"); + } + } + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId, diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index e866547b9f..c314825413 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -46,7 +46,7 @@ use ssz::{Decode, Encode}; use std::fmt; use std::future::Future; use std::time::Duration; -use types::{PayloadAttestationData, PayloadAttestationMessage}; +use types::{PayloadAttestationData, PayloadAttestationMessage, SignedProposerPreferences}; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); @@ -1849,6 +1849,46 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST validator/proposer_preferences` + pub async fn post_validator_proposer_preferences( + &self, + signed_preferences: &[SignedProposerPreferences], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("proposer_preferences"); + + self.post_generic_with_consensus_version(path, &signed_preferences, None, fork_name) + .await?; + + Ok(()) + } + + /// `POST validator/proposer_preferences` (SSZ) + pub async fn post_validator_proposer_preferences_ssz( + &self, + signed_preferences: &Vec, + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("proposer_preferences"); + + let ssz_body = signed_preferences.as_ssz_bytes(); + + self.post_generic_with_consensus_version_and_ssz_body(path, ssz_body, None, fork_name) + .await?; + + Ok(()) + } + /// `POST beacon/rewards/sync_committee` pub async fn post_beacon_rewards_sync_committee( &self, diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index 1b32777678..cc9729b44d 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -22,11 +22,12 @@ use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecutionPayloadEnvelope, Fork, FullPayload, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, - SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, - SignedExecutionPayloadEnvelope, SignedRoot, SignedValidatorRegistrationData, - SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, - SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, - VoluntaryExit, graffiti::GraffitiString, + ProposerPreferences, SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, + SignedContributionAndProof, SignedExecutionPayloadEnvelope, SignedProposerPreferences, + SignedRoot, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, + SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, + SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, VoluntaryExit, + graffiti::GraffitiString, }; use validator_store::{ AggregateToSign, AttestationToSign, ContributionToSign, DoppelgangerStatus, @@ -1485,4 +1486,32 @@ impl ValidatorStore for LighthouseValidatorS signature, }) } + + async fn sign_proposer_preferences( + &self, + validator_pubkey: PublicKeyBytes, + preferences: ProposerPreferences, + ) -> Result { + let signing_context = self.signing_context( + Domain::ProposerPreferences, + preferences.proposal_slot.epoch(E::slots_per_epoch()), + ); + + let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::ProposerPreferences(&preferences), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::SpecificError)?; + + Ok(SignedProposerPreferences { + message: preferences, + signature, + }) + } } diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index 2f80fa5761..0dfde98946 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -51,6 +51,7 @@ pub enum SignableMessage<'a, E: EthSpec, Payload: AbstractExecPayload = FullP VoluntaryExit(&'a VoluntaryExit), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), PayloadAttestationData(&'a PayloadAttestationData), + ProposerPreferences(&'a ProposerPreferences), } impl> SignableMessage<'_, E, Payload> { @@ -74,6 +75,7 @@ impl> SignableMessage<'_, E, Payload SignableMessage::VoluntaryExit(exit) => exit.signing_root(domain), SignableMessage::ExecutionPayloadEnvelope(e) => e.signing_root(domain), SignableMessage::PayloadAttestationData(d) => d.signing_root(domain), + SignableMessage::ProposerPreferences(p) => p.signing_root(domain), } } } @@ -243,6 +245,9 @@ impl SigningMethod { SignableMessage::PayloadAttestationData(d) => { Web3SignerObject::PayloadAttestationData(d) } + SignableMessage::ProposerPreferences(p) => { + Web3SignerObject::ProposerPreferences(p) + } }; // Determine the Web3Signer message type. diff --git a/validator_client/signing_method/src/web3signer.rs b/validator_client/signing_method/src/web3signer.rs index c2b7e06f92..baabb37947 100644 --- a/validator_client/signing_method/src/web3signer.rs +++ b/validator_client/signing_method/src/web3signer.rs @@ -22,6 +22,7 @@ pub enum MessageType { // TODO(gloas) verify w/ web3signer specs ExecutionPayloadEnvelope, PayloadAttestation, + ProposerPreferences, } #[derive(Debug, PartialEq, Copy, Clone, Serialize)] @@ -80,6 +81,7 @@ pub enum Web3SignerObject<'a, E: EthSpec, Payload: AbstractExecPayload> { ValidatorRegistration(&'a ValidatorRegistrationData), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), PayloadAttestationData(&'a PayloadAttestationData), + ProposerPreferences(&'a ProposerPreferences), } impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Payload> { @@ -147,6 +149,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Pa Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration, Web3SignerObject::ExecutionPayloadEnvelope(_) => MessageType::ExecutionPayloadEnvelope, Web3SignerObject::PayloadAttestationData(_) => MessageType::PayloadAttestation, + Web3SignerObject::ProposerPreferences(_) => MessageType::ProposerPreferences, } } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index b412db45f6..71d9333493 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -47,6 +47,7 @@ use validator_services::{ latency_service, payload_attestation_service::PayloadAttestationService, preparation_service::{PreparationService, PreparationServiceBuilder}, + proposer_preferences_service::ProposerPreferencesService, sync_committee_service::SyncCommitteeService, }; use validator_store::ValidatorStore as ValidatorStoreTrait; @@ -85,6 +86,8 @@ pub struct ProductionValidatorClient { attestation_service: AttestationService, SystemTimeSlotClock>, sync_committee_service: SyncCommitteeService, SystemTimeSlotClock>, payload_attestation_service: PayloadAttestationService, SystemTimeSlotClock>, + proposer_preferences_service: + ProposerPreferencesService, SystemTimeSlotClock>, doppelganger_service: Option>, preparation_service: PreparationService, SystemTimeSlotClock>, validator_store: Arc>, @@ -563,6 +566,15 @@ impl ProductionValidatorClient { context.eth2_config.spec.clone(), ); + let proposer_preferences_service = ProposerPreferencesService::new( + duties_service.clone(), + validator_store.clone(), + slot_clock.clone(), + beacon_nodes.clone(), + context.executor.clone(), + context.eth2_config.spec.clone(), + ); + Ok(Self { context, duties_service, @@ -570,6 +582,7 @@ impl ProductionValidatorClient { attestation_service, sync_committee_service, payload_attestation_service, + proposer_preferences_service, doppelganger_service, preparation_service, validator_store, @@ -646,6 +659,11 @@ impl ProductionValidatorClient { .clone() .start_update_service() .map_err(|e| format!("Unable to start payload attestation service: {}", e))?; + + self.proposer_preferences_service + .clone() + .start_update_service() + .map_err(|e| format!("Unable to start proposer preferences service: {}", e))?; } self.preparation_service diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index 0169335a7f..c39ef4499b 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -5,5 +5,6 @@ pub mod latency_service; pub mod notifier_service; pub mod payload_attestation_service; pub mod preparation_service; +pub mod proposer_preferences_service; pub mod sync; pub mod sync_committee_service; diff --git a/validator_client/validator_services/src/proposer_preferences_service.rs b/validator_client/validator_services/src/proposer_preferences_service.rs new file mode 100644 index 0000000000..fbefdf5d96 --- /dev/null +++ b/validator_client/validator_services/src/proposer_preferences_service.rs @@ -0,0 +1,221 @@ +use crate::duties_service::DutiesService; +use beacon_node_fallback::BeaconNodeFallback; +use slot_clock::SlotClock; +use std::ops::Deref; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::time::sleep; +use tracing::{debug, error, info, warn}; +use types::{ChainSpec, Epoch, EthSpec, ForkName, ProposerPreferences}; +use validator_store::ValidatorStore; + +pub struct Inner { + duties_service: Arc>, + validator_store: Arc, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + chain_spec: Arc, +} + +pub struct ProposerPreferencesService { + inner: Arc>, +} + +impl Clone for ProposerPreferencesService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for ProposerPreferencesService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl ProposerPreferencesService { + pub fn new( + duties_service: Arc>, + validator_store: Arc, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + chain_spec: Arc, + ) -> Self { + Self { + inner: Arc::new(Inner { + duties_service, + validator_store, + slot_clock, + beacon_nodes, + executor, + chain_spec, + }), + } + } + + pub fn start_update_service(self) -> Result<(), String> { + let slot_duration = self.chain_spec.get_slot_duration(); + info!("Proposer preferences service started"); + + let executor = self.executor.clone(); + + let interval_fut = async move { + loop { + let Some(current_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + continue; + }; + + if !self + .chain_spec + .fork_name_at_slot::(current_slot) + .gloas_enabled() + { + let duration_to_next_epoch = self + .slot_clock + .duration_to_next_epoch(S::E::slots_per_epoch()) + .unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32); + sleep(duration_to_next_epoch).await; + continue; + } + + let current_epoch = current_slot.epoch(S::E::slots_per_epoch()); + let fork_name = self.chain_spec.fork_name_at_slot::(current_slot); + self.publish_proposer_preferences(current_epoch, fork_name) + .await; + + let duration_to_next_epoch = self + .slot_clock + .duration_to_next_epoch(S::E::slots_per_epoch()) + .unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32); + sleep(duration_to_next_epoch).await; + } + }; + + executor.spawn(interval_fut, "proposer_preferences_service"); + Ok(()) + } + + async fn publish_proposer_preferences(&self, current_epoch: Epoch, fork_name: ForkName) { + let (dependent_root, duties) = { + let proposers = self.duties_service.proposers.read(); + match proposers.get(¤t_epoch) { + Some((root, duties)) => (*root, duties.clone()), + None => return, + } + }; + + let preferences_to_sign: Vec<_> = { + let mut result = vec![]; + for duty in &duties { + let Some(proposal_data) = self.validator_store.proposal_data(&duty.pubkey) else { + warn!( + validator = ?duty.pubkey, + "Missing proposal data for proposer preferences" + ); + continue; + }; + let Some(fee_recipient) = proposal_data.fee_recipient else { + warn!( + validator = ?duty.pubkey, + "Missing fee recipient for proposer preferences" + ); + continue; + }; + result.push(( + duty.pubkey, + ProposerPreferences { + dependent_root, + proposal_slot: duty.slot, + validator_index: duty.validator_index, + fee_recipient, + gas_limit: proposal_data.gas_limit, + }, + )); + } + result + }; + + if preferences_to_sign.is_empty() { + return; + } + + debug!( + %current_epoch, + count = preferences_to_sign.len(), + "Signing proposer preferences" + ); + + let mut signed = Vec::with_capacity(preferences_to_sign.len()); + for (pubkey, preferences) in preferences_to_sign { + match self + .validator_store + .sign_proposer_preferences(pubkey, preferences) + .await + { + Ok(signed_prefs) => signed.push(signed_prefs), + Err(e) => { + error!( + error = ?e, + validator = ?pubkey, + "Failed to sign proposer preferences" + ); + } + } + } + + if signed.is_empty() { + return; + } + + let count = signed.len(); + let signed = Arc::new(signed); + let result = self + .beacon_nodes + .first_success(|beacon_node| { + let signed = signed.clone(); + async move { + match beacon_node + .post_validator_proposer_preferences_ssz(&signed, fork_name) + .await + { + Ok(()) => Ok(()), + Err(ssz_err) => { + debug!(error = ?ssz_err, "SSZ publish failed, falling back to JSON"); + beacon_node + .post_validator_proposer_preferences(&signed, fork_name) + .await + .map_err(|e| { + format!("Failed to publish proposer preferences: {e:?}") + }) + } + } + } + }) + .await; + + match result { + Ok(()) => { + info!( + %current_epoch, + %count, + "Successfully published proposer preferences" + ); + } + Err(e) => { + error!( + error = %e, + %current_epoch, + "Failed to publish proposer preferences" + ); + } + } + } +} diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 4e5b415a41..d40c7994f1 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -8,10 +8,10 @@ use std::sync::Arc; use types::{ Address, Attestation, AttestationError, BlindedBeaconBlock, Epoch, EthSpec, ExecutionPayloadEnvelope, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, - SelectionProof, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedContributionAndProof, - SignedExecutionPayloadEnvelope, SignedValidatorRegistrationData, Slot, - SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, - ValidatorRegistrationData, + ProposerPreferences, SelectionProof, SignedAggregateAndProof, SignedBlindedBeaconBlock, + SignedContributionAndProof, SignedExecutionPayloadEnvelope, SignedProposerPreferences, + SignedValidatorRegistrationData, Slot, SyncCommitteeContribution, SyncCommitteeMessage, + SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, }; #[derive(Debug, PartialEq, Clone)] @@ -213,6 +213,13 @@ pub trait ValidatorStore: Send + Sync { data: PayloadAttestationData, ) -> impl Future>> + Send; + /// Sign a `ProposerPreferences` message. + fn sign_proposer_preferences( + &self, + validator_pubkey: PublicKeyBytes, + preferences: ProposerPreferences, + ) -> impl Future>> + Send; + /// Returns `ProposalData` for the provided `pubkey` if it exists in `InitializedValidators`. /// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`, /// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`.