diff --git a/Cargo.lock b/Cargo.lock index be6e2bf3f3..b3b4069e8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -808,7 +808,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "7.0.0-beta.4" +version = "7.0.0-beta.5" dependencies = [ "account_utils", "beacon_chain", @@ -1046,7 +1046,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "7.0.0-beta.4" +version = "7.0.0-beta.5" dependencies = [ "beacon_node", "bytes", @@ -4690,7 +4690,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "7.0.0-beta.4" +version = "7.0.0-beta.5" dependencies = [ "account_utils", "beacon_chain", @@ -5252,7 +5252,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "7.0.0-beta.4" +version = "7.0.0-beta.5" dependencies = [ "account_manager", "account_utils", @@ -10230,7 +10230,7 @@ dependencies = [ [[package]] name = "xdelta3" version = "0.1.5" -source = "git+http://github.com/sigp/xdelta3-rs?rev=50d63cdf1878e5cf3538e9aae5eed34a22c64e4a#50d63cdf1878e5cf3538e9aae5eed34a22c64e4a" +source = "git+http://github.com/sigp/xdelta3-rs?rev=4db64086bb02e9febb584ba93b9d16bb2ae3825a#4db64086bb02e9febb584ba93b9d16bb2ae3825a" dependencies = [ "bindgen", "cc", diff --git a/Cargo.toml b/Cargo.toml index 9d9141c922..5284713fc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -278,7 +278,7 @@ validator_metrics = { path = "validator_client/validator_metrics" } validator_store = { path = "validator_client/validator_store" } validator_test_rig = { path = "testing/validator_test_rig" } warp_utils = { path = "common/warp_utils" } -xdelta3 = { git = "http://github.com/sigp/xdelta3-rs", rev = "50d63cdf1878e5cf3538e9aae5eed34a22c64e4a" } +xdelta3 = { git = "http://github.com/sigp/xdelta3-rs", rev = "4db64086bb02e9febb584ba93b9d16bb2ae3825a" } zstd = "0.13" [profile.maxperf] diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index a537a1722c..cf963535c7 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "7.0.0-beta.4" +version = "7.0.0-beta.5" authors = [ "Paul Hauner ", "Age Manning ( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(warp_utils::json::json()) - .and(network_tx_filter) + .and(network_tx_filter.clone()) .then( |not_synced_filter: Result<(), Rejection>, task_spawner: TaskSpawner, @@ -4015,6 +4016,71 @@ pub fn serve( }, ); + // POST lighthouse/add_peer + let post_lighthouse_add_peer = warp::path("lighthouse") + .and(warp::path("add_peer")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(network_globals.clone()) + .and(network_tx_filter.clone()) + .then( + |request_data: api_types::AdminPeer, + task_spawner: TaskSpawner, + network_globals: Arc>, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + let enr = Enr::from_str(&request_data.enr).map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e)) + })?; + info!( + peer_id = %enr.peer_id(), + multiaddr = ?enr.multiaddr(), + "Adding trusted peer" + ); + network_globals.add_trusted_peer(enr.clone()); + + publish_network_message(&network_tx, NetworkMessage::ConnectTrustedPeer(enr))?; + + Ok(()) + }) + }, + ); + + // POST lighthouse/remove_peer + let post_lighthouse_remove_peer = warp::path("lighthouse") + .and(warp::path("remove_peer")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(network_globals.clone()) + .and(network_tx_filter.clone()) + .then( + |request_data: api_types::AdminPeer, + task_spawner: TaskSpawner, + network_globals: Arc>, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + let enr = Enr::from_str(&request_data.enr).map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e)) + })?; + info!( + peer_id = %enr.peer_id(), + multiaddr = ?enr.multiaddr(), + "Removing trusted peer" + ); + network_globals.remove_trusted_peer(enr.clone()); + + publish_network_message( + &network_tx, + NetworkMessage::DisconnectTrustedPeer(enr), + )?; + + Ok(()) + }) + }, + ); + // POST lighthouse/liveness let post_lighthouse_liveness = warp::path("lighthouse") .and(warp::path("liveness")) @@ -4774,6 +4840,8 @@ pub fn serve( .uor(post_lighthouse_ui_validator_info) .uor(post_lighthouse_finalize) .uor(post_lighthouse_compaction) + .uor(post_lighthouse_add_peer) + .uor(post_lighthouse_remove_peer) .recover(warp_utils::reject::handle_rejection), ), ) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 768441533f..6d407d2742 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -5750,6 +5750,27 @@ impl ApiTester { self } + pub async fn test_post_lighthouse_add_remove_peer(self) -> Self { + let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers(); + // Check that there aren't any trusted peers on startup + assert!(trusted_peers.is_empty()); + let enr = AdminPeer {enr: "enr:-QESuEDpVVjo8dmDuneRhLnXdIGY3e9NQiaG4sJR3GS-VMQCQDsmBYoQhJRaPeZzPlTsZj2F8v-iV4lKJEYIRIyztqexHodhdHRuZXRziAwAAAAAAAAAhmNsaWVudNiKTGlnaHRob3VzZYw3LjAuMC1iZXRhLjSEZXRoMpDS8Zl_YAAJEAAIAAAAAAAAgmlkgnY0gmlwhIe11XmDaXA2kCoBBPkAOitZAAAAAAAAAAKEcXVpY4IjKYVxdWljNoIjg4lzZWNwMjU2azGhA43ihEr9BUVVnIHIfFqBR3Izs4YRHHPsTqIbUgEb3Hc8iHN5bmNuZXRzD4N0Y3CCIyiEdGNwNoIjgoN1ZHCCIyiEdWRwNoIjgg".to_string()}; + self.client + .post_lighthouse_add_peer(enr.clone()) + .await + .unwrap(); + let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers(); + // Should have 1 trusted peer + assert_eq!(trusted_peers.len(), 1); + + self.client.post_lighthouse_remove_peer(enr).await.unwrap(); + let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers(); + // Should be empty after removing + assert!(trusted_peers.is_empty()); + + self + } + pub async fn test_post_lighthouse_liveness(self) -> Self { let epoch = self.chain.epoch().unwrap(); let head_state = self.chain.head_beacon_state_cloned(); @@ -7314,6 +7335,8 @@ async fn lighthouse_endpoints() { .test_post_lighthouse_database_reconstruct() .await .test_post_lighthouse_liveness() + .await + .test_post_lighthouse_add_remove_peer() .await; } diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 8c642ec91f..4b48c7e625 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -114,6 +114,7 @@ pub struct PeerManager { metrics_enabled: bool, /// Keeps track of whether the QUIC protocol is enabled or not. quic_enabled: bool, + trusted_peers: HashSet, } /// The events that the `PeerManager` outputs (requests). @@ -192,6 +193,7 @@ impl PeerManager { discovery_enabled, metrics_enabled, quic_enabled, + trusted_peers: Default::default(), }) } @@ -888,7 +890,7 @@ impl PeerManager { } // Gracefully disconnects a peer without banning them. - fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.events .push(PeerManagerEvent::DisconnectPeer(peer_id, reason)); self.network_globals @@ -936,6 +938,13 @@ impl PeerManager { } } + fn maintain_trusted_peers(&mut self) { + let trusted_peers = self.trusted_peers.clone(); + for trusted_peer in trusted_peers { + self.dial_peer(trusted_peer); + } + } + /// This function checks the status of our current peers and optionally requests a discovery /// query if we need to find more peers to maintain the current number of peers fn maintain_peer_count(&mut self, dialing_peers: usize) { @@ -1233,6 +1242,7 @@ impl PeerManager { fn heartbeat(&mut self) { // Optionally run a discovery query if we need more peers. self.maintain_peer_count(0); + self.maintain_trusted_peers(); // Cleans up the connection state of dialing peers. // Libp2p dials peer-ids, but sometimes the response is from another peer-id or libp2p @@ -1469,6 +1479,14 @@ impl PeerManager { ) }) } + + pub fn add_trusted_peer(&mut self, enr: Enr) { + self.trusted_peers.insert(enr); + } + + pub fn remove_trusted_peer(&mut self, enr: Enr) { + self.trusted_peers.remove(&enr); + } } enum ConnectingType { diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 4a0388058b..54e74457b8 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -9,7 +9,7 @@ use std::net::IpAddr; use std::time::Instant; use std::{cmp::Ordering, fmt::Display}; use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, fmt::Formatter, }; use sync_status::SyncStatus; @@ -77,6 +77,33 @@ impl PeerDB { self.peers.iter() } + pub fn set_trusted_peer(&mut self, enr: Enr) { + match self.peers.entry(enr.peer_id()) { + Entry::Occupied(mut info) => { + let entry = info.get_mut(); + entry.score = Score::max_score(); + entry.is_trusted = true; + } + Entry::Vacant(entry) => { + entry.insert(PeerInfo::trusted_peer_info()); + } + } + } + + pub fn unset_trusted_peer(&mut self, enr: Enr) { + if let Some(info) = self.peers.get_mut(&enr.peer_id()) { + info.is_trusted = false; + info.score = Score::default(); + } + } + + pub fn trusted_peers(&self) -> Vec { + self.peers + .iter() + .filter_map(|(id, info)| if info.is_trusted { Some(*id) } else { None }) + .collect() + } + /// Gives the ids of all known peers. pub fn peer_ids(&self) -> impl Iterator { self.peers.keys() diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 4cb94ca383..4c47df6343 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -22,7 +22,7 @@ use PeerConnectionStatus::*; #[serde(bound = "E: EthSpec")] pub struct PeerInfo { /// The peers reputation - score: Score, + pub(crate) score: Score, /// Client managing this peer client: Client, /// Connection status of this peer @@ -51,7 +51,7 @@ pub struct PeerInfo { #[serde(skip)] min_ttl: Option, /// Is the peer a trusted peer. - is_trusted: bool, + pub(crate) is_trusted: bool, /// Direction of the first connection of the last (or current) connected session with this peer. /// None if this peer was never connected. connection_direction: Option, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 3f0b5b96ef..9650976c63 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1475,6 +1475,21 @@ impl Network { } } + /// Adds the given `enr` to the trusted peers mapping and tries to dial it + /// every heartbeat to maintain the connection. + pub fn dial_trusted_peer(&mut self, enr: Enr) { + self.peer_manager_mut().add_trusted_peer(enr.clone()); + self.peer_manager_mut().dial_peer(enr); + } + + /// Remove the given peer from the trusted peers mapping if it exists and disconnect + /// from it. + pub fn remove_trusted_peer(&mut self, enr: Enr) { + self.peer_manager_mut().remove_trusted_peer(enr.clone()); + self.peer_manager_mut() + .disconnect_peer(enr.peer_id(), GoodbyeReason::TooManyPeers); + } + /* Sub-behaviour event handling functions */ /// Handle a gossipsub event. diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index f41f60008e..3031a0dff7 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -172,6 +172,18 @@ impl NetworkGlobals { .unwrap_or_default() } + pub fn add_trusted_peer(&self, enr: Enr) { + self.peers.write().set_trusted_peer(enr); + } + + pub fn remove_trusted_peer(&self, enr: Enr) { + self.peers.write().unset_trusted_peer(enr); + } + + pub fn trusted_peers(&self) -> Vec { + self.peers.read().trusted_peers() + } + /// Updates the syncing state of the node. /// /// The old state is returned diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index d25e8509a4..778ac63290 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -13,6 +13,7 @@ use futures::prelude::*; use lighthouse_network::rpc::{RequestId, RequestType}; use lighthouse_network::service::Network; use lighthouse_network::types::GossipKind; +use lighthouse_network::Enr; use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance}; use lighthouse_network::{ rpc::{GoodbyeReason, RpcErrorResponse}, @@ -101,6 +102,10 @@ pub enum NetworkMessage { reason: GoodbyeReason, source: ReportSource, }, + /// Connect to a trusted peer and try to maintain the connection. + ConnectTrustedPeer(Enr), + /// Disconnect from a trusted peer and remove it from the `trusted_peers` mapping. + DisconnectTrustedPeer(Enr), } /// Messages triggered by validators that may trigger a subscription to a subnet. @@ -666,6 +671,12 @@ impl NetworkService { reason, source, } => self.libp2p.goodbye_peer(&peer_id, reason, source), + NetworkMessage::ConnectTrustedPeer(enr) => { + self.libp2p.dial_trusted_peer(enr); + } + NetworkMessage::DisconnectTrustedPeer(enr) => { + self.libp2p.remove_trusted_peer(enr); + } NetworkMessage::SubscribeCoreTopics => { if self.subscribed_core_topics() { return; diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index a29e680eb5..a659c65452 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -21,8 +21,8 @@ static EMPTY_PUBKEY: LazyLock = LazyLock::new(PublicKeyBytes::em pub enum Error { InvalidHierarchy, DiffDeletionsNotSupported, - UnableToComputeDiff, - UnableToApplyDiff, + UnableToComputeDiff(xdelta3::Error), + UnableToApplyDiff(xdelta3::Error), BalancesIncompleteChunk, Compression(std::io::Error), InvalidSszState(ssz::DecodeError), @@ -323,9 +323,15 @@ impl BytesDiff { } pub fn compute_xdelta(source_bytes: &[u8], target_bytes: &[u8]) -> Result { - let bytes = xdelta3::encode(target_bytes, source_bytes) - .ok_or(Error::UnableToComputeDiff) - .unwrap(); + // TODO(hdiff): Use a smaller estimate for the output diff buffer size, currently the + // xdelta3 lib will use 2x the size of the source plus the target length, which is 4x the + // size of the hdiff buffer. In practice, diffs are almost always smaller than buffers (by a + // signficiant factor), so this is 4-16x larger than necessary in a temporary allocation. + // + // We should use an estimated size that *should* be enough, and then dynamically increase it + // if we hit an insufficient space error. + let bytes = + xdelta3::encode(target_bytes, source_bytes).map_err(Error::UnableToComputeDiff)?; Ok(Self { bytes }) } @@ -334,8 +340,31 @@ impl BytesDiff { } pub fn apply_xdelta(&self, source: &[u8], target: &mut Vec) -> Result<(), Error> { - *target = xdelta3::decode(&self.bytes, source).ok_or(Error::UnableToApplyDiff)?; - Ok(()) + // TODO(hdiff): Dynamic buffer allocation. This is a stopgap until we implement a schema + // change to store the output buffer size inside the `BytesDiff`. + let mut output_length = ((source.len() + self.bytes.len()) * 3) / 2; + let mut num_resizes = 0; + loop { + match xdelta3::decode_with_output_len(&self.bytes, source, output_length as u32) { + Ok(result_buffer) => { + *target = result_buffer; + + metrics::observe( + &metrics::BEACON_HDIFF_BUFFER_APPLY_RESIZES, + num_resizes as f64, + ); + return Ok(()); + } + Err(xdelta3::Error::InsufficientOutputLength) => { + // Double the output buffer length and try again. + output_length *= 2; + num_resizes += 1; + } + Err(err) => { + return Err(Error::UnableToApplyDiff(err)); + } + } + } } /// Byte size of this instance diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 6f9f667917..5da73c3cad 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -202,6 +202,13 @@ pub static BEACON_HDIFF_BUFFER_CLONE_TIMES: LazyLock> = LazyLo "Time required to clone hierarchical diff buffer bytes", ) }); +pub static BEACON_HDIFF_BUFFER_APPLY_RESIZES: LazyLock> = LazyLock::new(|| { + try_create_histogram_with_buckets( + "store_hdiff_buffer_apply_resizes", + "Number of times during diff application that the output buffer had to be resized before decoding succeeded", + Ok(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0]) + ) +}); /* * Beacon Block */ diff --git a/boot_node/Cargo.toml b/boot_node/Cargo.toml index c9f0c04fca..5638be0564 100644 --- a/boot_node/Cargo.toml +++ b/boot_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "boot_node" -version = "7.0.0-beta.4" +version = "7.0.0-beta.5" authors = ["Sigma Prime "] edition = { workspace = true } diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index a9f2f471b0..9a5d9100cf 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -7,7 +7,10 @@ pub mod sync_state; use crate::{ lighthouse::sync_state::SyncState, - types::{DepositTreeSnapshot, Epoch, FinalizedExecutionBlock, GenericResponse, ValidatorId}, + types::{ + AdminPeer, DepositTreeSnapshot, Epoch, FinalizedExecutionBlock, GenericResponse, + ValidatorId, + }, BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot, }; use proto_array::core::ProtoArray; @@ -365,6 +368,30 @@ impl BeaconNodeHttpClient { self.post_with_response(path, &()).await } + /// `POST lighthouse/add_peer` + pub async fn post_lighthouse_add_peer(&self, req: AdminPeer) -> Result<(), Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("add_peer"); + + self.post_with_response(path, &req).await + } + + /// `POST lighthouse/remove_peer` + pub async fn post_lighthouse_remove_peer(&self, req: AdminPeer) -> Result<(), Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("remove_peer"); + + self.post_with_response(path, &req).await + } + /* Analysis endpoints. */ diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 9839fcfda4..2e9bac0397 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1411,6 +1411,11 @@ pub struct ManualFinalizationRequestData { pub block_root: Hash256, } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct AdminPeer { + pub enr: String, +} + #[derive(Debug, Serialize, Deserialize)] pub struct LivenessRequestData { pub epoch: Epoch, diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index 1c62cd7b8a..bd5e31e3ab 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v7.0.0-beta.4-", - fallback = "Lighthouse/v7.0.0-beta.4" + prefix = "Lighthouse/v7.0.0-beta.5-", + fallback = "Lighthouse/v7.0.0-beta.5" ); /// Returns the first eight characters of the latest commit hash for this build. @@ -54,7 +54,7 @@ pub fn version_with_platform() -> String { /// /// `1.5.1` pub fn version() -> &'static str { - "7.0.0-beta.4" + "7.0.0-beta.5" } /// Returns the name of the current client running. diff --git a/consensus/state_processing/src/per_epoch_processing/single_pass.rs b/consensus/state_processing/src/per_epoch_processing/single_pass.rs index 5c31669a60..af6a0936e2 100644 --- a/consensus/state_processing/src/per_epoch_processing/single_pass.rs +++ b/consensus/state_processing/src/per_epoch_processing/single_pass.rs @@ -175,6 +175,7 @@ pub fn process_epoch_single_pass( let mut earliest_exit_epoch = state.earliest_exit_epoch().ok(); let mut exit_balance_to_consume = state.exit_balance_to_consume().ok(); + let validators_in_consolidations = get_validators_in_consolidations(state); // Split the state into several disjoint mutable borrows. let ( @@ -317,17 +318,26 @@ pub fn process_epoch_single_pass( // `process_effective_balance_updates` if conf.effective_balance_updates { - process_single_effective_balance_update( - validator_info.index, - *balance, - &mut validator, - validator_info.current_epoch_participation, - &mut next_epoch_cache, - progressive_balances, - effective_balances_ctxt, - state_ctxt, - spec, - )?; + if validators_in_consolidations.contains(&validator_info.index) { + process_single_dummy_effective_balance_update( + validator_info.index, + &validator, + &mut next_epoch_cache, + state_ctxt, + )?; + } else { + process_single_effective_balance_update( + validator_info.index, + *balance, + &mut validator, + validator_info.current_epoch_participation, + &mut next_epoch_cache, + progressive_balances, + effective_balances_ctxt, + state_ctxt, + spec, + )?; + } } } @@ -430,6 +440,7 @@ pub fn process_epoch_single_pass( if fork_name.electra_enabled() && conf.pending_consolidations { process_pending_consolidations( state, + &validators_in_consolidations, &mut next_epoch_cache, effective_balances_ctxt, conf.effective_balance_updates, @@ -1026,12 +1037,38 @@ fn process_pending_deposits_for_validator( Ok(()) } +/// Return the set of validators referenced by consolidations, either as source or target. +/// +/// This function is blind to whether the consolidations are valid and capable of being processed, +/// it just returns the set of all indices present in consolidations. This is *sufficient* to +/// make consolidations play nicely with effective balance updates. The algorithm used is: +/// +/// - In the single pass: apply effective balance updates for all validators *not* referenced by +/// consolidations. +/// - Apply consolidations. +/// - Apply effective balance updates for all validators previously skipped. +/// +/// Prior to Electra, the empty set is returned. +fn get_validators_in_consolidations(state: &BeaconState) -> BTreeSet { + let mut referenced_validators = BTreeSet::new(); + + if let Ok(pending_consolidations) = state.pending_consolidations() { + for pending_consolidation in pending_consolidations { + referenced_validators.insert(pending_consolidation.source_index as usize); + referenced_validators.insert(pending_consolidation.target_index as usize); + } + } + + referenced_validators +} + /// We process pending consolidations after all of single-pass epoch processing, and then patch up /// the effective balances for affected validators. /// /// This is safe because processing consolidations does not depend on the `effective_balance`. fn process_pending_consolidations( state: &mut BeaconState, + validators_in_consolidations: &BTreeSet, next_epoch_cache: &mut PreEpochCache, effective_balances_ctxt: &EffectiveBalancesContext, perform_effective_balance_updates: bool, @@ -1042,8 +1079,6 @@ fn process_pending_consolidations( let next_epoch = state.next_epoch()?; let pending_consolidations = state.pending_consolidations()?.clone(); - let mut affected_validators = BTreeSet::new(); - for pending_consolidation in &pending_consolidations { let source_index = pending_consolidation.source_index as usize; let target_index = pending_consolidation.target_index as usize; @@ -1069,9 +1104,6 @@ fn process_pending_consolidations( decrease_balance(state, source_index, source_effective_balance)?; increase_balance(state, target_index, source_effective_balance)?; - affected_validators.insert(source_index); - affected_validators.insert(target_index); - next_pending_consolidation.safe_add_assign(1)?; } @@ -1087,7 +1119,7 @@ fn process_pending_consolidations( // Re-process effective balance updates for validators affected by consolidations. let (validators, balances, _, current_epoch_participation, _, progressive_balances, _, _) = state.mutable_validator_fields()?; - for validator_index in affected_validators { + for &validator_index in validators_in_consolidations { let balance = *balances .get(validator_index) .ok_or(BeaconStateError::UnknownValidator(validator_index))?; @@ -1129,6 +1161,28 @@ impl EffectiveBalancesContext { } } +/// This function is called for validators that do not have their effective balance updated as +/// part of the single-pass loop. For these validators we compute their true effective balance +/// update after processing consolidations. However, to maintain the invariants of the +/// `PreEpochCache` we must register _some_ effective balance for them immediately. +fn process_single_dummy_effective_balance_update( + validator_index: usize, + validator: &Cow, + next_epoch_cache: &mut PreEpochCache, + state_ctxt: &StateContext, +) -> Result<(), Error> { + // Populate the effective balance cache with the current effective balance. This will be + // overriden when `process_single_effective_balance_update` is called. + let is_active_next_epoch = validator.is_active_at(state_ctxt.next_epoch); + let temporary_effective_balance = validator.effective_balance; + next_epoch_cache.update_effective_balance( + validator_index, + temporary_effective_balance, + is_active_next_epoch, + )?; + Ok(()) +} + /// This function abstracts over phase0 and Electra effective balance processing. #[allow(clippy::too_many_arguments)] fn process_single_effective_balance_update( diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index c4f4d1699c..22b19f7413 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "7.0.0-beta.4" +version = "7.0.0-beta.5" authors = ["Paul Hauner "] edition = { workspace = true } diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index b968440f44..3774a9c458 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "7.0.0-beta.4" +version = "7.0.0-beta.5" authors = ["Sigma Prime "] edition = { workspace = true } autotests = false