diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index abb183d847..26276457c0 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1702,7 +1702,7 @@ impl BeaconChain { /// non-existing/inactive validators will have `None` values. pub fn validator_inclusion_list_duties( &self, - validator_indices: &[u64], + validator_indices_pubkeys: &[(usize, PublicKeyBytes)], epoch: Epoch, head_block_root: Hash256, ) -> Result<(Vec>, Hash256), Error> { @@ -1732,11 +1732,11 @@ impl BeaconChain { let Some(head_beacon_state) = head_beacon_state else { return Err(Error::MissingBeaconState(head_block.root)); }; - let duties = validator_indices + let duties = validator_indices_pubkeys .iter() - .map(|&validator_index| { + .map(|(validator_index, pubkey_bytes)| { head_beacon_state - .get_inclusion_list_duties(validator_index as usize, epoch, &self.spec) + .get_inclusion_list_duties(*pubkey_bytes, *validator_index, epoch, &self.spec) .map_err(Error::InclusionListDutiesError) }) .collect::, _>>()?; @@ -2124,13 +2124,22 @@ impl BeaconChain { // Use a blocking task since blocking the core executor on the canonical head read lock can // block the core tokio executor. let chain = self.clone(); - let (head_slot, head_hash) = self + let (head_slot, parent_hash) = self .spawn_blocking_handle( move || { let cached_head = chain.canonical_head.cached_head(); let head_slot = cached_head.head_slot(); - let head_hash = cached_head.head_hash(); - (head_slot, head_hash) + // let head_hash = cached_head.head_hash(); + if let Ok(execution_payload) = cached_head + .snapshot + .beacon_block + .message() + .execution_payload() + { + (head_slot, Some(execution_payload.parent_hash())) + } else { + (head_slot, None) + } }, "produce_inclusion_list_head_read", ) @@ -2138,11 +2147,8 @@ impl BeaconChain { // NOTE: not sure how to handle scenario where head hash is `None` i.e. pre-bellatrix, which // is pre-electra. - let Some(head_hash) = head_hash else { - debug!( - self.log, - "Attempted to produce inclusion list pre-bellatrix" - ); + let Some(parent_hash) = parent_hash else { + debug!(self.log, "Failed to fetch parent_hash"); return Ok(None); }; @@ -2172,12 +2178,12 @@ impl BeaconChain { return Ok(None); } + debug!(self.log, "Attempt to fetch IL from EL"; "parent_hash" => %parent_hash, "current_slot" => %current_slot); // Retrieve the inclusion list from the execution layer. let inclusion_list = execution_layer - .get_inclusion_list(head_hash.into_root()) + .get_inclusion_list(parent_hash.0) .await .map_err(|e| Error::ExecutionLayerGetInclusionListFailed(Box::new(e)))?; - debug!(self.log, "Inclusion list fetched from EL"; "tx_count" => inclusion_list.len()); Ok(Some(inclusion_list)) @@ -7364,6 +7370,7 @@ impl BeaconChain { pub async fn set_unsatisfied_inclusion_list_block( self: &Arc, + slot: Slot, block_root: Hash256, ) -> Result<(), Error> { let chain = self.clone(); @@ -7373,7 +7380,7 @@ impl BeaconChain { chain .canonical_head .fork_choice_write_lock() - .on_invalid_inclusion_list_payload(block_root) + .on_invalid_inclusion_list_payload(slot, block_root) }, "invalid_inclusion_list_payload", ) diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 8c19b8ad0d..7b0732f70e 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -141,7 +141,7 @@ pub struct BeaconForkChoiceStore, Cold: ItemStore< proposer_boost_root: Hash256, equivocating_indices: BTreeSet, inclusion_list_equivocators: HashMap<(Slot, Hash256), BTreeSet>, - unsatisfied_inclusion_list_block: Hash256, + unsatisfied_inclusion_list_blocks: HashMap, _phantom: PhantomData, } @@ -192,7 +192,7 @@ where proposer_boost_root: Hash256::zero(), equivocating_indices: BTreeSet::new(), inclusion_list_equivocators: HashMap::new(), - unsatisfied_inclusion_list_block: Hash256::zero(), + unsatisfied_inclusion_list_blocks: HashMap::new(), _phantom: PhantomData, }) } @@ -210,7 +210,6 @@ where unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint, proposer_boost_root: self.proposer_boost_root, equivocating_indices: self.equivocating_indices.clone(), - unsatisfied_inclusion_list_block: self.unsatisfied_inclusion_list_block, } } @@ -233,7 +232,7 @@ where proposer_boost_root: persisted.proposer_boost_root, equivocating_indices: persisted.equivocating_indices, inclusion_list_equivocators: HashMap::new(), - unsatisfied_inclusion_list_block: persisted.unsatisfied_inclusion_list_block, + unsatisfied_inclusion_list_blocks: HashMap::new(), _phantom: PhantomData, }) } @@ -352,12 +351,17 @@ where self.equivocating_indices.extend(indices); } - fn set_unsatisfied_inclusion_list_block(&mut self, block_root: Hash256) { - self.unsatisfied_inclusion_list_block = block_root; + fn set_unsatisfied_inclusion_list_block(&mut self, slot: Slot, block_root: Hash256) { + self.unsatisfied_inclusion_list_blocks + .insert(slot, block_root); } - fn unsatisfied_inclusion_list_block(&self) -> &Hash256 { - &self.unsatisfied_inclusion_list_block + fn unsatisfied_inclusion_list_block(&self, slot: Slot) -> Option<&Hash256> { + self.unsatisfied_inclusion_list_blocks.get(&slot) + } + + fn unsatisfied_inclusion_list_blocks(&self) -> &HashMap { + &self.unsatisfied_inclusion_list_blocks } } @@ -375,5 +379,4 @@ pub struct PersistedForkChoiceStore { pub unrealized_finalized_checkpoint: Checkpoint, pub proposer_boost_root: Hash256, pub equivocating_indices: BTreeSet, - pub unsatisfied_inclusion_list_block: Hash256, } diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index 3408bf8a95..ddc26132da 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -113,7 +113,7 @@ impl PayloadNotifier { .get_inclusion_list_transactions(block.slot()) .unwrap_or(vec![].into()); - debug!(chain.log, "Adding inclusion list transactions in the Payload Notifier"; "count" => inclusion_list_transactions.len()); + debug!(chain.log, "Adding inclusion list transactions in the Payload Notifier"; "count" => inclusion_list_transactions.len(), "slot" => block.slot()); inclusion_list_transactions } else { vec![].into() @@ -215,8 +215,15 @@ async fn notify_new_payload( // transactions for this slot, update the fork choice store before processing // the invalid EL payload. if *validation_error == Some("INVALID_INCLUSION_LIST".to_string()) { + debug!( + chain.log, + "Unsatisfied inclusion list"; + ); chain - .set_unsatisfied_inclusion_list_block(block.tree_hash_root()) + .set_unsatisfied_inclusion_list_block( + block.slot(), + block.tree_hash_root(), + ) .await?; } diff --git a/beacon_node/beacon_chain/src/inclusion_list_verification.rs b/beacon_node/beacon_chain/src/inclusion_list_verification.rs index 1edb981d16..28abb1325a 100644 --- a/beacon_node/beacon_chain/src/inclusion_list_verification.rs +++ b/beacon_node/beacon_chain/src/inclusion_list_verification.rs @@ -6,13 +6,9 @@ use types::{Domain, EthSpec, SignedInclusionList, SignedRoot, Slot}; #[derive(Debug, AsRefStr)] pub enum GossipInclusionListError { - FutureSlot { + InvalidSlot { message_slot: Slot, - latest_permissible_slot: Slot, - }, - PastSlot { - message_slot: Slot, - earliest_permissible_slot: Slot, + current_slot: Slot, }, InvalidCommitteeRoot, ValidatorNotInCommittee, @@ -40,24 +36,16 @@ impl GossipVerifiedInclusionList { ) -> Result { // the slot is equal to the previous slot or the current slot let message_slot = signed_il.message.slot; - let earliest_permissible_slot = chain + + let current_slot = chain .slot_clock - .now_with_past_tolerance(chain.spec.maximum_gossip_clock_disparity()) + .now() .ok_or(BeaconChainError::UnableToReadSlot)?; - if message_slot < earliest_permissible_slot { - return Err(GossipInclusionListError::PastSlot { + + if message_slot != current_slot + 1 { + return Err(GossipInclusionListError::InvalidSlot { message_slot, - earliest_permissible_slot, - }); - } - let latest_permissible_slot = chain - .slot_clock - .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) - .ok_or(BeaconChainError::UnableToReadSlot)?; - if message_slot > latest_permissible_slot { - return Err(GossipInclusionListError::FutureSlot { - message_slot, - latest_permissible_slot, + current_slot, }); } diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 2d5716ea56..fd69987344 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -731,10 +731,7 @@ impl HttpJsonRpc { pub async fn get_inclusion_list( &self, parent_hash: Hash256, - ) -> Result< - VariableList, E::MaxTransactionsPerInclusionList>, - Error, - > { + ) -> Result>, Error> { let params = json!([parent_hash]); self.rpc_request( @@ -852,11 +849,18 @@ impl HttpJsonRpc { Ok(response.into()) } - // TODO(fulu): switch to v5 endpoint when the EL is ready for Fulu - pub async fn new_payload_v4_fulu( + pub async fn new_payload_v5_fulu( &self, new_payload_request_fulu: NewPayloadRequestFulu<'_, E>, ) -> Result { + // TODO(focil) clean this up? + let mut il_transactions = vec![]; + for transaction in new_payload_request_fulu.il_transactions { + if let Ok(hex_tx) = String::from_utf8(transaction.into()).map(|v| format!("0x{}", v)) { + il_transactions.push(hex_tx); + } + } + let params = json!([ JsonExecutionPayload::V5(new_payload_request_fulu.execution_payload.clone().into()), new_payload_request_fulu.versioned_hashes, @@ -864,11 +868,12 @@ impl HttpJsonRpc { new_payload_request_fulu .execution_requests .get_execution_requests_list(), + il_transactions ]); let response: JsonPayloadStatusV1 = self .rpc_request( - ENGINE_NEW_PAYLOAD_V4, + ENGINE_NEW_PAYLOAD_V5, params, ENGINE_NEW_PAYLOAD_TIMEOUT * self.execution_timeout_multiplier, ) @@ -1301,11 +1306,10 @@ impl HttpJsonRpc { } } NewPayloadRequest::Fulu(new_payload_request_fulu) => { - // TODO(fulu): switch to v5 endpoint when the EL is ready for Fulu - if engine_capabilities.new_payload_v4 { - self.new_payload_v4_fulu(new_payload_request_fulu).await + if engine_capabilities.new_payload_v5 { + self.new_payload_v5_fulu(new_payload_request_fulu).await } else { - Err(Error::RequiredMethodUnsupported("engine_newPayloadV4")) + Err(Error::RequiredMethodUnsupported("engine_newPayloadV5")) } } } diff --git a/beacon_node/execution_layer/src/engine_api/new_payload_request.rs b/beacon_node/execution_layer/src/engine_api/new_payload_request.rs index c622d34855..b2801eef2f 100644 --- a/beacon_node/execution_layer/src/engine_api/new_payload_request.rs +++ b/beacon_node/execution_layer/src/engine_api/new_payload_request.rs @@ -47,7 +47,7 @@ pub struct NewPayloadRequest<'block, E: EthSpec> { pub parent_beacon_block_root: Hash256, #[superstruct(only(Electra, Fulu))] pub execution_requests: &'block ExecutionRequests, - #[superstruct(only(Electra, Fulu))] + #[superstruct(only(Fulu))] pub il_transactions: InclusionListTransactions, } @@ -204,7 +204,6 @@ impl<'a, E: EthSpec> NewPayloadRequest<'a, E> { .collect(), parent_beacon_block_root: block_ref.parent_root, execution_requests: &block_ref.body.execution_requests, - il_transactions, })), BeaconBlockRef::Fulu(block_ref) => Ok(Self::Fulu(NewPayloadRequestFulu { execution_payload: &block_ref.body.execution_payload.execution_payload, @@ -259,7 +258,6 @@ impl<'a, E: EthSpec> TryFrom> for NewPayloadRequest<'a, E> .collect(), parent_beacon_block_root: block_ref.parent_root, execution_requests: &block_ref.body.execution_requests, - il_transactions: vec![].into(), })), BeaconBlockRef::Fulu(block_ref) => Ok(Self::Fulu(NewPayloadRequestFulu { execution_payload: &block_ref.body.execution_payload.execution_payload, diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index f21ab130a5..8ff677dda4 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -166,6 +166,20 @@ pub enum Error { BeaconStateError(BeaconStateError), PayloadTypeMismatch, VerifyingVersionedHashes(versioned_hashes::Error), + HexError(hex::FromHexError), + SszTypeError(ssz_types::Error), +} + +impl From for Error { + fn from(e: ssz_types::Error) -> Self { + Error::SszTypeError(e) + } +} + +impl From for Error { + fn from(e: hex::FromHexError) -> Self { + Error::HexError(e) + } } impl From for Error { @@ -1958,12 +1972,24 @@ impl ExecutionLayer { parent_hash: Hash256, ) -> Result, Error> { debug!(self.log(), "Requesting inclusion list from EL"; "parent_hash" => %parent_hash); - let transactions = self + let raw_transactions = self .engine() .api .get_inclusion_list::(parent_hash) .await?; - Ok(transactions) + // TODO(focil) clean this up? + let mut transactions = vec![]; + + let Some(raw_transactions) = raw_transactions else { + debug!(self.log(), "The EL sent an empty inclusion list"; "parent_hash" => %parent_hash); + return Ok(transactions.into()); + }; + for raw_tx in raw_transactions { + let decoded_hex_tx = + VariableList::new(hex::decode(raw_tx.strip_prefix("0x").unwrap_or(&raw_tx))?)?; + transactions.push(decoded_hex_tx); + } + Ok(transactions.into()) } } diff --git a/beacon_node/http_api/src/inclusion_list_duties.rs b/beacon_node/http_api/src/inclusion_list_duties.rs index f998ecaaef..11f66e90c7 100644 --- a/beacon_node/http_api/src/inclusion_list_duties.rs +++ b/beacon_node/http_api/src/inclusion_list_duties.rs @@ -1,10 +1,10 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2::types::{self as api_types}; use slot_clock::SlotClock; -use types::{Epoch, EthSpec, Hash256, InclusionListDuty}; +use types::{Epoch, EthSpec, Hash256, InclusionListDuty, PublicKeyBytes}; /// The struct that is returned to the requesting HTTP client. -type ApiDuties = api_types::DutiesResponse>; +type ApiDuties = api_types::DutiesResponse>; /// Handles a request from the HTTP API for inclusion list duties. pub fn inclusion_list_duties( @@ -13,6 +13,15 @@ pub fn inclusion_list_duties( chain: &BeaconChain, ) -> Result { let current_epoch = chain.epoch().map_err(warp_utils::reject::unhandled_error)?; + let request_indices = request_indices + .iter() + .map(|i| *i as usize) + .collect::>(); + let indices_and_pubkeys: Vec<(usize, PublicKeyBytes)> = chain + .validator_pubkey_bytes_many(&request_indices) + .map_err(|_| warp_utils::reject::custom_server_error("unable to fetch pubkey".into()))? + .into_iter() + .collect(); // Determine what the current epoch would be if we fast-forward our system clock by // `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. @@ -32,10 +41,10 @@ pub fn inclusion_list_duties( { let head_block_root = chain.canonical_head.cached_head().head_block_root(); let (duties, dependent_root) = chain - .validator_inclusion_list_duties(request_indices, request_epoch, head_block_root) + .validator_inclusion_list_duties(&indices_and_pubkeys, request_epoch, head_block_root) .map_err(warp_utils::reject::unhandled_error)?; //.map_err(warp_utils::reject::beacon_chain_error)?; - convert_to_api_response(duties, request_indices, dependent_root, chain) + convert_to_api_response(duties, &request_indices, dependent_root, chain) } else if request_epoch > current_epoch + 1 { Err(warp_utils::reject::custom_bad_request(format!( "request epoch {} is more than one epoch past the current epoch {}", @@ -52,13 +61,14 @@ pub fn inclusion_list_duties( } } -/// Convert the internal representation of attester duties into the format returned to the HTTP +// TODO(focil) unused chain +/// Convert the internal representation of inclusion duties into the format returned to the HTTP /// client. fn convert_to_api_response( duties: Vec>, - indices: &[u64], + indices: &[usize], dependent_root: Hash256, - chain: &BeaconChain, + _chain: &BeaconChain, ) -> Result { // Protect against an inconsistent slot clock. if duties.len() != indices.len() { @@ -69,26 +79,23 @@ fn convert_to_api_response( ))); } - let usize_indices = indices.iter().map(|i| *i as usize).collect::>(); - let index_to_pubkey_map = chain - .validator_pubkey_bytes_many(&usize_indices) - .map_err(warp_utils::reject::unhandled_error)?; + // TODO(focil) + // let usize_indices = indices.iter().map(|i| *i as usize).collect::>(); + // let index_to_pubkey_map = chain + // .validator_pubkey_bytes_many(indices) + // .map_err(warp_utils::reject::unhandled_error)?; // .map_err(warp_utils::reject::beacon_chain_error)?; let data = duties .into_iter() .zip(indices) - .filter_map(|(duty_opt, &validator_index)| { + .filter_map(|(duty_opt, _)| { let duty = duty_opt?; - Some(api_types::InclusionListDutyData { - validator_index, - pubkey: *index_to_pubkey_map.get(&(validator_index as usize))?, - slot: duty.slot, - }) + Some(duty) }) .collect::>(); - // TODO: account for optimistic execution + // TODO(focil): account for optimistic execution Ok(api_types::DutiesResponse { dependent_root, execution_optimistic: None, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 0467657989..ae39353586 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3584,11 +3584,13 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) + .and(log_filter.clone()) .then( |query: api_types::ValidatorInclusionListQuery, not_synced_filter: Result<(), Rejection>, task_spawner: TaskSpawner, - chain: Arc>| { + chain: Arc>, + log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { not_synced_filter?; @@ -3604,11 +3606,21 @@ pub fn serve( ))); } - let data = chain + let data = match chain .produce_inclusion_list(query.slot) .await .map(api_types::GenericResponse::from) - .map_err(warp_utils::reject::unhandled_error)?; + { + Ok(data) => data, + Err(e) => { + error!( + log, + "Failed producing IL"; + "err" => format!("{:?}", e), + ); + return Err(warp_utils::reject::unhandled_error(e)); + } + }; Ok::<_, warp::reject::Rejection>(warp::reply::json(&data).into_response()) }) }, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index be64c8684f..08b39b9f4a 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -2181,8 +2181,7 @@ impl NetworkBeaconProcessor { .on_verified_inclusion_list(gossip_verified_il.signed_il); } Err(err) => match err { - GossipInclusionListError::FutureSlot { .. } - | GossipInclusionListError::PastSlot { .. } + GossipInclusionListError::InvalidSlot { .. } | GossipInclusionListError::ValidatorNotInCommittee | GossipInclusionListError::TooManyTransactions | GossipInclusionListError::InvalidSignature diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index b94115ced3..1ab07a122c 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -2476,7 +2476,7 @@ impl BeaconNodeHttpClient { pub async fn get_validator_inclusion_list( &self, slot: Slot, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -2565,7 +2565,7 @@ impl BeaconNodeHttpClient { &self, epoch: Epoch, indices: &[u64], - ) -> Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 202cd36f81..9de9f06d78 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -748,14 +748,6 @@ pub struct ProposerData { pub slot: Slot, } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct InclusionListDutyData { - pub pubkey: PublicKeyBytes, - #[serde(with = "serde_utils::quoted_u64")] - pub validator_index: u64, - pub slot: Slot, -} - #[derive(Clone, Deserialize)] pub struct ValidatorBlocksQuery { pub randao_reveal: SignatureBytes, diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index fcf6e65460..b25616a3e1 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -390,7 +390,7 @@ where *fc_store.finalized_checkpoint(), current_epoch_shuffling_id, next_epoch_shuffling_id, - *fc_store.unsatisfied_inclusion_list_block(), + fc_store.unsatisfied_inclusion_list_blocks().clone(), execution_status, )?; @@ -628,9 +628,9 @@ where } // TODO(focil) add documentation - pub fn on_invalid_inclusion_list_payload(&mut self, block_root: Hash256) { + pub fn on_invalid_inclusion_list_payload(&mut self, slot: Slot, block_root: Hash256) { self.fc_store - .set_unsatisfied_inclusion_list_block(block_root); + .set_unsatisfied_inclusion_list_block(slot, block_root); } /// Add `block` to the fork choice DAG. diff --git a/consensus/fork_choice/src/fork_choice_store.rs b/consensus/fork_choice/src/fork_choice_store.rs index b33622d7b9..3953469cd8 100644 --- a/consensus/fork_choice/src/fork_choice_store.rs +++ b/consensus/fork_choice/src/fork_choice_store.rs @@ -1,8 +1,8 @@ use proto_array::JustifiedBalances; use std::collections::BTreeSet; +use std::collections::HashMap; use std::fmt::Debug; use types::{AbstractExecPayload, BeaconBlockRef, BeaconState, Checkpoint, EthSpec, Hash256, Slot}; - /// Approximates the `Store` in "Ethereum 2.0 Phase 0 -- Beacon Chain Fork Choice": /// /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#store @@ -80,9 +80,12 @@ pub trait ForkChoiceStore: Sized { /// Adds to the set of equivocating indices. fn extend_equivocating_indices(&mut self, indices: impl IntoIterator); - /// Returns the `unsatisfied_inclusion_list_block`. - fn unsatisfied_inclusion_list_block(&self) -> &Hash256; + /// Returns the `unsatisfied_inclusion_list_blocks` mapping. + fn unsatisfied_inclusion_list_blocks(&self) -> &HashMap; + + /// Returns the `unsatisfied_inclusion_list_block` for the given slot. + fn unsatisfied_inclusion_list_block(&self, slot: Slot) -> Option<&Hash256>; /// Sets the `unsatisfied_inclusion_list_block`. - fn set_unsatisfied_inclusion_list_block(&mut self, block_root: Hash256); + fn set_unsatisfied_inclusion_list_block(&mut self, slot: Slot, block_root: Hash256); } diff --git a/consensus/proto_array/src/fork_choice_test_definition.rs b/consensus/proto_array/src/fork_choice_test_definition.rs index 1a824c7ff5..42e7e0ac8b 100644 --- a/consensus/proto_array/src/fork_choice_test_definition.rs +++ b/consensus/proto_array/src/fork_choice_test_definition.rs @@ -6,7 +6,7 @@ mod votes; use crate::proto_array_fork_choice::{Block, ExecutionStatus, ProtoArrayForkChoice}; use crate::{InvalidationOperation, JustifiedBalances}; use serde::{Deserialize, Serialize}; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap}; use types::{ AttestationShufflingId, Checkpoint, Epoch, EthSpec, ExecutionBlockHash, FixedBytesExtended, Hash256, MainnetEthSpec, Slot, @@ -87,7 +87,7 @@ impl ForkChoiceTestDefinition { self.finalized_checkpoint, junk_shuffling_id.clone(), junk_shuffling_id, - Hash256::ZERO, + HashMap::new(), ExecutionStatus::Optimistic(ExecutionBlockHash::zero()), ) .expect("should create fork choice struct"); diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 6480a84d8c..3fdd3f18e2 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -134,7 +134,7 @@ pub struct ProtoArray { pub finalized_checkpoint: Checkpoint, pub nodes: Vec, pub indices: HashMap, - pub unsatisfied_inclusion_list_block: Hash256, + pub unsatisfied_inclusion_list_blocks: HashMap, pub previous_proposer_boost: ProposerBoost, } @@ -197,8 +197,14 @@ impl ProtoArray { let execution_status_is_invalid = node.execution_status.is_invalid(); // TODO(focil) seems sketchy... + // modify is viable for head + // debug/fork_choice let mut node_delta = if execution_status_is_invalid - || node.root == self.unsatisfied_inclusion_list_block + || node.root + == *self + .unsatisfied_inclusion_list_blocks + .get(¤t_slot) + .unwrap_or(&Hash256::ZERO) { // If the node has an invalid execution payload, or the payload doesn't satisfy // an inclusion list, reduce its weight to zero. @@ -892,7 +898,13 @@ impl ProtoArray { return false; } - if node.root == self.unsatisfied_inclusion_list_block { + // TODO(focil) unwrap_or + if node.root + == *self + .unsatisfied_inclusion_list_blocks + .get(¤t_slot) + .unwrap_or(&Hash256::ZERO) + { return false; } diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 6589a47734..be73ce2a21 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -370,7 +370,7 @@ impl ProtoArrayForkChoice { finalized_checkpoint: Checkpoint, current_epoch_shuffling_id: AttestationShufflingId, next_epoch_shuffling_id: AttestationShufflingId, - unsatisfied_inclusion_list_block: Hash256, + unsatisfied_inclusion_list_blocks: HashMap, execution_status: ExecutionStatus, ) -> Result { let mut proto_array = ProtoArray { @@ -379,7 +379,7 @@ impl ProtoArrayForkChoice { finalized_checkpoint, nodes: Vec::with_capacity(1), indices: HashMap::with_capacity(1), - unsatisfied_inclusion_list_block, + unsatisfied_inclusion_list_blocks, previous_proposer_boost: ProposerBoost::default(), }; @@ -1033,7 +1033,7 @@ mod test_compute_deltas { genesis_checkpoint, junk_shuffling_id.clone(), junk_shuffling_id.clone(), - Hash256::ZERO, + HashMap::new(), execution_status, ) .unwrap(); @@ -1160,7 +1160,7 @@ mod test_compute_deltas { genesis_checkpoint, junk_shuffling_id.clone(), junk_shuffling_id.clone(), - Hash256::ZERO, + HashMap::new(), execution_status, ) .unwrap(); diff --git a/consensus/proto_array/src/ssz_container.rs b/consensus/proto_array/src/ssz_container.rs index 31c6afcbb8..e6ffc2ac04 100644 --- a/consensus/proto_array/src/ssz_container.rs +++ b/consensus/proto_array/src/ssz_container.rs @@ -8,7 +8,7 @@ use ssz::{four_byte_option_impl, Encode}; use ssz_derive::{Decode, Encode}; use std::collections::HashMap; use superstruct::superstruct; -use types::{Checkpoint, Hash256}; +use types::{Checkpoint, Hash256, Slot}; // Define a "legacy" implementation of `Option` which uses four bytes for encoding the union // selector. @@ -27,7 +27,7 @@ pub struct SszContainer { pub nodes: Vec, pub indices: Vec<(Hash256, usize)>, pub previous_proposer_boost: ProposerBoost, - pub unsatisfied_inclusion_list_block: Hash256, + pub unsatisfied_inclusion_list_blocks: Vec<(Slot, Hash256)>, } impl From<&ProtoArrayForkChoice> for SszContainer { @@ -43,7 +43,11 @@ impl From<&ProtoArrayForkChoice> for SszContainer { nodes: proto_array.nodes.clone(), indices: proto_array.indices.iter().map(|(k, v)| (*k, *v)).collect(), previous_proposer_boost: proto_array.previous_proposer_boost, - unsatisfied_inclusion_list_block: proto_array.unsatisfied_inclusion_list_block, + unsatisfied_inclusion_list_blocks: proto_array + .unsatisfied_inclusion_list_blocks + .iter() + .map(|(k, v)| (*k, *v)) + .collect(), } } } @@ -58,8 +62,11 @@ impl TryFrom for ProtoArrayForkChoice { finalized_checkpoint: from.finalized_checkpoint, nodes: from.nodes, indices: from.indices.into_iter().collect::>(), - unsatisfied_inclusion_list_block: from.unsatisfied_inclusion_list_block, previous_proposer_boost: from.previous_proposer_boost, + unsatisfied_inclusion_list_blocks: from + .unsatisfied_inclusion_list_blocks + .into_iter() + .collect::>(), }; Ok(Self { diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 34485ef69c..ba9b6025bc 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1824,6 +1824,7 @@ impl BeaconState { pub fn get_inclusion_list_duties( &self, + pubkey: PublicKeyBytes, validator_index: usize, epoch: Epoch, spec: &ChainSpec, @@ -1837,6 +1838,7 @@ impl BeaconState { slot, validator_index, committee_root, + pubkey, })); } } diff --git a/consensus/types/src/beacon_state/inclusion_list_cache.rs b/consensus/types/src/beacon_state/inclusion_list_cache.rs index c28de6cc2a..9249e62ccd 100644 --- a/consensus/types/src/beacon_state/inclusion_list_cache.rs +++ b/consensus/types/src/beacon_state/inclusion_list_cache.rs @@ -19,41 +19,24 @@ struct Inner { } impl InclusionListCache { - pub fn initialize(&mut self, slot: Slot) { - let inner = Inner { - inclusion_lists: HashSet::new(), - inclusion_lists_seen: HashSet::new(), - inclusion_list_equivocators: HashSet::new(), - inclusion_list_transactions: HashSet::new(), - }; - - self.inner_map.insert(slot, inner); - } - pub fn clear_cache(&mut self, slot: Slot) { self.inner_map.remove(&slot); } pub fn on_inclusion_list(&mut self, inclusion_list: SignedInclusionList, log: &Logger) { - let Some(inner) = self.inner_map.get_mut(&inclusion_list.message.slot) else { - return; - }; + let slot = inclusion_list.message.slot; + let inner = self.inner_map.entry(slot).or_default(); if inner .inclusion_list_equivocators .contains(&inclusion_list.message.validator_index) { - return; - } - - if inner - .inclusion_lists_seen - .contains(&inclusion_list.message.validator_index) - && !inner.inclusion_lists.contains(&inclusion_list) - { - inner - .inclusion_list_equivocators - .insert(inclusion_list.message.validator_index); + debug!( + log, + "This validator was flagged for an equivocating inclusion list"; + "slot" => slot, + "validator_index" => inclusion_list.message.validator_index + ); return; } @@ -63,6 +46,27 @@ impl InclusionListCache { .contains(&inclusion_list.message.validator_index) && inner.inclusion_lists.contains(&inclusion_list) { + debug!( + log, + "Already seen identical inclusion list from this validator"; + ); + return; + } + + if inner + .inclusion_lists_seen + .contains(&inclusion_list.message.validator_index) + && !inner.inclusion_lists.contains(&inclusion_list) + { + debug!( + log, + "Equivocating inclusion list"; + "slot" => slot, + "validator_index" => inclusion_list.message.validator_index + ); + inner + .inclusion_list_equivocators + .insert(inclusion_list.message.validator_index); return; } @@ -78,7 +82,9 @@ impl InclusionListCache { debug!( log, - "Successfully added inclusion list transactions to the cache" + "Successfully added inclusion list transactions to the cache"; + "slot" => slot, + "total_count" => inner.inclusion_list_transactions.len() ); } @@ -86,7 +92,7 @@ impl InclusionListCache { &self, slot: Slot, ) -> Option> { - let Some(inner) = &self.inner_map.get(&slot) else { + let Some(inner) = self.inner_map.get(&slot) else { return None; }; diff --git a/consensus/types/src/inclusion_list_duty.rs b/consensus/types/src/inclusion_list_duty.rs index 4317abd5eb..9f7969f917 100644 --- a/consensus/types/src/inclusion_list_duty.rs +++ b/consensus/types/src/inclusion_list_duty.rs @@ -1,7 +1,7 @@ use crate::*; use serde::{Deserialize, Serialize}; -#[derive(arbitrary::Arbitrary, Debug, PartialEq, Clone, Copy, Default, Serialize, Deserialize)] +#[derive(arbitrary::Arbitrary, Debug, PartialEq, Clone, Copy, Serialize, Deserialize)] pub struct InclusionListDuty { /// The slot during which the validator must produce an inclusion list. pub slot: Slot, @@ -10,4 +10,6 @@ pub struct InclusionListDuty { pub validator_index: u64, /// The hash tree root of the inclusion list committee. pub committee_root: Hash256, + /// The pubkey of the validator. + pub pubkey: PublicKeyBytes, } diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 654dfc1754..0f08e5bd94 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -13,7 +13,7 @@ use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use doppelganger_service::DoppelgangerStatus; use environment::RuntimeContext; use eth2::types::{ - AttesterData, BeaconCommitteeSubscription, DutiesResponse, InclusionListDutyData, ProposerData, + AttesterData, BeaconCommitteeSubscription, DutiesResponse, InclusionListDuty, ProposerData, StateId, ValidatorId, }; use futures::{stream, StreamExt}; @@ -207,7 +207,7 @@ type DependentRoot = Hash256; type AttesterMap = HashMap>; type ProposerMap = HashMap)>; type InclusionListDutiesMap = - HashMap>; + HashMap>; /// See the module-level documentation. pub struct DutiesService { @@ -334,8 +334,8 @@ impl DutiesService { .collect() } - /// Returns all `InclusionListDutyData` for the given `slot`. - pub fn inclusion_list_duties(&self, slot: Slot) -> Vec { + /// Returns all `InclusionListDuty` for the given `slot`. + pub fn inclusion_list_duties(&self, slot: Slot) -> Vec { let epoch = slot.epoch(E::slots_per_epoch()); if !self.spec.is_focil_enabled_for_epoch(epoch) { @@ -1370,7 +1370,7 @@ async fn poll_beacon_inclusion_list_duties_for_epoch response.data.len(), + ); let dependent_root = response.dependent_root; // Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch. @@ -1443,7 +1449,7 @@ async fn poll_beacon_inclusion_list_duties_for_epoch new_duties.len(), ); - // Update the duties service with the new `InclusionListDutyData` messages. + // Update the duties service with the new `InclusionListDuty` messages. let mut inclusion_list_duties = duties_service.inclusion_list_duties.write(); // TODO(focil) this variable is unused at the moment let _current_slot = duties_service @@ -1490,7 +1496,7 @@ async fn post_validator_duties_inclusion_list>, epoch: Epoch, validator_indices: &[u64], -) -> Result>, Error> { +) -> Result>, Error> { duties_service .beacon_nodes .first_success(|beacon_node| async move { diff --git a/validator_client/validator_services/src/inclusion_list_service.rs b/validator_client/validator_services/src/inclusion_list_service.rs index d7155f9a76..3db0847ae5 100644 --- a/validator_client/validator_services/src/inclusion_list_service.rs +++ b/validator_client/validator_services/src/inclusion_list_service.rs @@ -1,14 +1,13 @@ use crate::duties_service::DutiesService; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use environment::RuntimeContext; -use eth2::types::InclusionListDutyData; use futures::future::join_all; -use slog::{crit, error, info, trace, warn}; +use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; use tokio::time::{sleep, Duration}; -use types::{ChainSpec, EthSpec, Slot}; +use types::{ChainSpec, EthSpec, InclusionList, InclusionListDuty, Slot}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; /// Helper to minimise `Arc` usage. @@ -118,9 +117,18 @@ impl InclusionListService { _slot_duration: Duration, spec: &ChainSpec, ) -> Result<(), String> { + debug!( + self.context.log(), + "Spawning inclusion list task"; + ); + let next_slot = self.slot_clock.now().ok_or("Failed to read slot clock")? + 1; if !spec.is_focil_enabled_for_epoch(next_slot.epoch(E::slots_per_epoch())) { + debug!( + self.context.log(), + "FOCIL not enabled"; + ); return Ok(()); } @@ -153,7 +161,7 @@ impl InclusionListService { async fn produce_and_publish_inclusion_lists( self, slot: Slot, - validator_duties: Vec, + validator_duties: Vec, ) -> Result<(), ()> { let log = self.context.log(); let validator_store = self.validator_store.clone(); @@ -178,12 +186,12 @@ impl InclusionListService { ) })?; - let inclusion_list = self + let inclusion_list_transactions = self .beacon_nodes .first_success(|beacon_node| async move { // TODO(focil) add timer metric beacon_node - .get_validator_inclusion_list(slot) + .get_validator_inclusion_list::(slot) .await .map_err(|e| format!("Failed to produce inclusion list: {:?}", e)) .map(|result| result.ok_or("Inclusion list unavailable".to_string()))? @@ -201,6 +209,12 @@ impl InclusionListService { // Create futures to produce signed `InclusionList` objects. let signing_futures = validator_duties.iter().map(|duty| { + let inclusion_list = InclusionList { + slot, + transactions: inclusion_list_transactions.clone(), + inclusion_list_committee_root: duty.committee_root, + validator_index: duty.validator_index, + }; let inclusion_list = inclusion_list.clone(); let validator_store = Arc::clone(&validator_store); async move {