mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-29 10:54:24 +00:00
Merge remote-tracking branch 'origin/release-v7.0.0' into unstable
This commit is contained in:
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -808,7 +808,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "beacon_node"
|
||||
version = "7.0.0-beta.4"
|
||||
version = "7.0.0-beta.5"
|
||||
dependencies = [
|
||||
"account_utils",
|
||||
"beacon_chain",
|
||||
@@ -1046,7 +1046,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "boot_node"
|
||||
version = "7.0.0-beta.4"
|
||||
version = "7.0.0-beta.5"
|
||||
dependencies = [
|
||||
"beacon_node",
|
||||
"bytes",
|
||||
@@ -4690,7 +4690,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "lcli"
|
||||
version = "7.0.0-beta.4"
|
||||
version = "7.0.0-beta.5"
|
||||
dependencies = [
|
||||
"account_utils",
|
||||
"beacon_chain",
|
||||
@@ -5252,7 +5252,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lighthouse"
|
||||
version = "7.0.0-beta.4"
|
||||
version = "7.0.0-beta.5"
|
||||
dependencies = [
|
||||
"account_manager",
|
||||
"account_utils",
|
||||
@@ -10230,7 +10230,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "xdelta3"
|
||||
version = "0.1.5"
|
||||
source = "git+http://github.com/sigp/xdelta3-rs?rev=50d63cdf1878e5cf3538e9aae5eed34a22c64e4a#50d63cdf1878e5cf3538e9aae5eed34a22c64e4a"
|
||||
source = "git+http://github.com/sigp/xdelta3-rs?rev=4db64086bb02e9febb584ba93b9d16bb2ae3825a#4db64086bb02e9febb584ba93b9d16bb2ae3825a"
|
||||
dependencies = [
|
||||
"bindgen",
|
||||
"cc",
|
||||
|
||||
@@ -278,7 +278,7 @@ validator_metrics = { path = "validator_client/validator_metrics" }
|
||||
validator_store = { path = "validator_client/validator_store" }
|
||||
validator_test_rig = { path = "testing/validator_test_rig" }
|
||||
warp_utils = { path = "common/warp_utils" }
|
||||
xdelta3 = { git = "http://github.com/sigp/xdelta3-rs", rev = "50d63cdf1878e5cf3538e9aae5eed34a22c64e4a" }
|
||||
xdelta3 = { git = "http://github.com/sigp/xdelta3-rs", rev = "4db64086bb02e9febb584ba93b9d16bb2ae3825a" }
|
||||
zstd = "0.13"
|
||||
|
||||
[profile.maxperf]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "beacon_node"
|
||||
version = "7.0.0-beta.4"
|
||||
version = "7.0.0-beta.5"
|
||||
authors = [
|
||||
"Paul Hauner <paul@paulhauner.com>",
|
||||
"Age Manning <Age@AgeManning.com",
|
||||
|
||||
@@ -820,10 +820,30 @@ where
|
||||
));
|
||||
}
|
||||
|
||||
let validator_pubkey_cache = self.validator_pubkey_cache.map(Ok).unwrap_or_else(|| {
|
||||
ValidatorPubkeyCache::new(&head_snapshot.beacon_state, store.clone())
|
||||
.map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e))
|
||||
})?;
|
||||
let validator_pubkey_cache = self
|
||||
.validator_pubkey_cache
|
||||
.map(|mut validator_pubkey_cache| {
|
||||
// If any validators weren't persisted to disk on previous runs, this will use the head state to
|
||||
// "top-up" the in-memory validator cache and its on-disk representation with any missing validators.
|
||||
let pubkey_store_ops = validator_pubkey_cache
|
||||
.import_new_pubkeys(&head_snapshot.beacon_state)
|
||||
.map_err(|e| format!("Unable to top-up persisted pubkey cache {:?}", e))?;
|
||||
if !pubkey_store_ops.is_empty() {
|
||||
// Write any missed validators to disk
|
||||
debug!(
|
||||
missing_validators = pubkey_store_ops.len(),
|
||||
"Topping up validator pubkey cache"
|
||||
);
|
||||
store
|
||||
.do_atomically_with_block_and_blobs_cache(pubkey_store_ops)
|
||||
.map_err(|e| format!("Unable to write pubkeys to disk {:?}", e))?;
|
||||
}
|
||||
Ok(validator_pubkey_cache)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
ValidatorPubkeyCache::new(&head_snapshot.beacon_state, store.clone())
|
||||
.map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e))
|
||||
})?;
|
||||
|
||||
let migrator_config = self.store_migrator_config.unwrap_or_default();
|
||||
let store_migrator =
|
||||
|
||||
@@ -54,7 +54,7 @@ use eth2::types::{
|
||||
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
|
||||
use health_metrics::observe::Observe;
|
||||
use lighthouse_network::rpc::methods::MetaData;
|
||||
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
|
||||
use lighthouse_network::{types::SyncState, Enr, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
|
||||
use lighthouse_version::version_with_platform;
|
||||
use logging::{crit, SSELoggingComponents};
|
||||
use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage};
|
||||
@@ -72,6 +72,7 @@ use std::future::Future;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use sysinfo::{System, SystemExt};
|
||||
use system_health::{observe_nat, observe_system_health_bn};
|
||||
@@ -3586,7 +3587,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.and(task_spawner_filter.clone())
|
||||
.and(chain_filter.clone())
|
||||
.and(warp_utils::json::json())
|
||||
.and(network_tx_filter)
|
||||
.and(network_tx_filter.clone())
|
||||
.then(
|
||||
|not_synced_filter: Result<(), Rejection>,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
@@ -4015,6 +4016,71 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
},
|
||||
);
|
||||
|
||||
// POST lighthouse/add_peer
|
||||
let post_lighthouse_add_peer = warp::path("lighthouse")
|
||||
.and(warp::path("add_peer"))
|
||||
.and(warp::path::end())
|
||||
.and(warp_utils::json::json())
|
||||
.and(task_spawner_filter.clone())
|
||||
.and(network_globals.clone())
|
||||
.and(network_tx_filter.clone())
|
||||
.then(
|
||||
|request_data: api_types::AdminPeer,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
||||
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||
let enr = Enr::from_str(&request_data.enr).map_err(|e| {
|
||||
warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e))
|
||||
})?;
|
||||
info!(
|
||||
peer_id = %enr.peer_id(),
|
||||
multiaddr = ?enr.multiaddr(),
|
||||
"Adding trusted peer"
|
||||
);
|
||||
network_globals.add_trusted_peer(enr.clone());
|
||||
|
||||
publish_network_message(&network_tx, NetworkMessage::ConnectTrustedPeer(enr))?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
// POST lighthouse/remove_peer
|
||||
let post_lighthouse_remove_peer = warp::path("lighthouse")
|
||||
.and(warp::path("remove_peer"))
|
||||
.and(warp::path::end())
|
||||
.and(warp_utils::json::json())
|
||||
.and(task_spawner_filter.clone())
|
||||
.and(network_globals.clone())
|
||||
.and(network_tx_filter.clone())
|
||||
.then(
|
||||
|request_data: api_types::AdminPeer,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
|
||||
task_spawner.blocking_json_task(Priority::P0, move || {
|
||||
let enr = Enr::from_str(&request_data.enr).map_err(|e| {
|
||||
warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e))
|
||||
})?;
|
||||
info!(
|
||||
peer_id = %enr.peer_id(),
|
||||
multiaddr = ?enr.multiaddr(),
|
||||
"Removing trusted peer"
|
||||
);
|
||||
network_globals.remove_trusted_peer(enr.clone());
|
||||
|
||||
publish_network_message(
|
||||
&network_tx,
|
||||
NetworkMessage::DisconnectTrustedPeer(enr),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
// POST lighthouse/liveness
|
||||
let post_lighthouse_liveness = warp::path("lighthouse")
|
||||
.and(warp::path("liveness"))
|
||||
@@ -4774,6 +4840,8 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.uor(post_lighthouse_ui_validator_info)
|
||||
.uor(post_lighthouse_finalize)
|
||||
.uor(post_lighthouse_compaction)
|
||||
.uor(post_lighthouse_add_peer)
|
||||
.uor(post_lighthouse_remove_peer)
|
||||
.recover(warp_utils::reject::handle_rejection),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -5750,6 +5750,27 @@ impl ApiTester {
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_post_lighthouse_add_remove_peer(self) -> Self {
|
||||
let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers();
|
||||
// Check that there aren't any trusted peers on startup
|
||||
assert!(trusted_peers.is_empty());
|
||||
let enr = AdminPeer {enr: "enr:-QESuEDpVVjo8dmDuneRhLnXdIGY3e9NQiaG4sJR3GS-VMQCQDsmBYoQhJRaPeZzPlTsZj2F8v-iV4lKJEYIRIyztqexHodhdHRuZXRziAwAAAAAAAAAhmNsaWVudNiKTGlnaHRob3VzZYw3LjAuMC1iZXRhLjSEZXRoMpDS8Zl_YAAJEAAIAAAAAAAAgmlkgnY0gmlwhIe11XmDaXA2kCoBBPkAOitZAAAAAAAAAAKEcXVpY4IjKYVxdWljNoIjg4lzZWNwMjU2azGhA43ihEr9BUVVnIHIfFqBR3Izs4YRHHPsTqIbUgEb3Hc8iHN5bmNuZXRzD4N0Y3CCIyiEdGNwNoIjgoN1ZHCCIyiEdWRwNoIjgg".to_string()};
|
||||
self.client
|
||||
.post_lighthouse_add_peer(enr.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers();
|
||||
// Should have 1 trusted peer
|
||||
assert_eq!(trusted_peers.len(), 1);
|
||||
|
||||
self.client.post_lighthouse_remove_peer(enr).await.unwrap();
|
||||
let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers();
|
||||
// Should be empty after removing
|
||||
assert!(trusted_peers.is_empty());
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_post_lighthouse_liveness(self) -> Self {
|
||||
let epoch = self.chain.epoch().unwrap();
|
||||
let head_state = self.chain.head_beacon_state_cloned();
|
||||
@@ -7314,6 +7335,8 @@ async fn lighthouse_endpoints() {
|
||||
.test_post_lighthouse_database_reconstruct()
|
||||
.await
|
||||
.test_post_lighthouse_liveness()
|
||||
.await
|
||||
.test_post_lighthouse_add_remove_peer()
|
||||
.await;
|
||||
}
|
||||
|
||||
|
||||
@@ -114,6 +114,7 @@ pub struct PeerManager<E: EthSpec> {
|
||||
metrics_enabled: bool,
|
||||
/// Keeps track of whether the QUIC protocol is enabled or not.
|
||||
quic_enabled: bool,
|
||||
trusted_peers: HashSet<Enr>,
|
||||
}
|
||||
|
||||
/// The events that the `PeerManager` outputs (requests).
|
||||
@@ -192,6 +193,7 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
discovery_enabled,
|
||||
metrics_enabled,
|
||||
quic_enabled,
|
||||
trusted_peers: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -888,7 +890,7 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
}
|
||||
|
||||
// Gracefully disconnects a peer without banning them.
|
||||
fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
||||
pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
||||
self.events
|
||||
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
|
||||
self.network_globals
|
||||
@@ -936,6 +938,13 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
}
|
||||
}
|
||||
|
||||
fn maintain_trusted_peers(&mut self) {
|
||||
let trusted_peers = self.trusted_peers.clone();
|
||||
for trusted_peer in trusted_peers {
|
||||
self.dial_peer(trusted_peer);
|
||||
}
|
||||
}
|
||||
|
||||
/// This function checks the status of our current peers and optionally requests a discovery
|
||||
/// query if we need to find more peers to maintain the current number of peers
|
||||
fn maintain_peer_count(&mut self, dialing_peers: usize) {
|
||||
@@ -1233,6 +1242,7 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
fn heartbeat(&mut self) {
|
||||
// Optionally run a discovery query if we need more peers.
|
||||
self.maintain_peer_count(0);
|
||||
self.maintain_trusted_peers();
|
||||
|
||||
// Cleans up the connection state of dialing peers.
|
||||
// Libp2p dials peer-ids, but sometimes the response is from another peer-id or libp2p
|
||||
@@ -1469,6 +1479,14 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_trusted_peer(&mut self, enr: Enr) {
|
||||
self.trusted_peers.insert(enr);
|
||||
}
|
||||
|
||||
pub fn remove_trusted_peer(&mut self, enr: Enr) {
|
||||
self.trusted_peers.remove(&enr);
|
||||
}
|
||||
}
|
||||
|
||||
enum ConnectingType {
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::net::IpAddr;
|
||||
use std::time::Instant;
|
||||
use std::{cmp::Ordering, fmt::Display};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
collections::{hash_map::Entry, HashMap, HashSet},
|
||||
fmt::Formatter,
|
||||
};
|
||||
use sync_status::SyncStatus;
|
||||
@@ -77,6 +77,33 @@ impl<E: EthSpec> PeerDB<E> {
|
||||
self.peers.iter()
|
||||
}
|
||||
|
||||
pub fn set_trusted_peer(&mut self, enr: Enr) {
|
||||
match self.peers.entry(enr.peer_id()) {
|
||||
Entry::Occupied(mut info) => {
|
||||
let entry = info.get_mut();
|
||||
entry.score = Score::max_score();
|
||||
entry.is_trusted = true;
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(PeerInfo::trusted_peer_info());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unset_trusted_peer(&mut self, enr: Enr) {
|
||||
if let Some(info) = self.peers.get_mut(&enr.peer_id()) {
|
||||
info.is_trusted = false;
|
||||
info.score = Score::default();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trusted_peers(&self) -> Vec<PeerId> {
|
||||
self.peers
|
||||
.iter()
|
||||
.filter_map(|(id, info)| if info.is_trusted { Some(*id) } else { None })
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Gives the ids of all known peers.
|
||||
pub fn peer_ids(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.peers.keys()
|
||||
|
||||
@@ -22,7 +22,7 @@ use PeerConnectionStatus::*;
|
||||
#[serde(bound = "E: EthSpec")]
|
||||
pub struct PeerInfo<E: EthSpec> {
|
||||
/// The peers reputation
|
||||
score: Score,
|
||||
pub(crate) score: Score,
|
||||
/// Client managing this peer
|
||||
client: Client,
|
||||
/// Connection status of this peer
|
||||
@@ -51,7 +51,7 @@ pub struct PeerInfo<E: EthSpec> {
|
||||
#[serde(skip)]
|
||||
min_ttl: Option<Instant>,
|
||||
/// Is the peer a trusted peer.
|
||||
is_trusted: bool,
|
||||
pub(crate) is_trusted: bool,
|
||||
/// Direction of the first connection of the last (or current) connected session with this peer.
|
||||
/// None if this peer was never connected.
|
||||
connection_direction: Option<ConnectionDirection>,
|
||||
|
||||
@@ -1475,6 +1475,21 @@ impl<E: EthSpec> Network<E> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds the given `enr` to the trusted peers mapping and tries to dial it
|
||||
/// every heartbeat to maintain the connection.
|
||||
pub fn dial_trusted_peer(&mut self, enr: Enr) {
|
||||
self.peer_manager_mut().add_trusted_peer(enr.clone());
|
||||
self.peer_manager_mut().dial_peer(enr);
|
||||
}
|
||||
|
||||
/// Remove the given peer from the trusted peers mapping if it exists and disconnect
|
||||
/// from it.
|
||||
pub fn remove_trusted_peer(&mut self, enr: Enr) {
|
||||
self.peer_manager_mut().remove_trusted_peer(enr.clone());
|
||||
self.peer_manager_mut()
|
||||
.disconnect_peer(enr.peer_id(), GoodbyeReason::TooManyPeers);
|
||||
}
|
||||
|
||||
/* Sub-behaviour event handling functions */
|
||||
|
||||
/// Handle a gossipsub event.
|
||||
|
||||
@@ -172,6 +172,18 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn add_trusted_peer(&self, enr: Enr) {
|
||||
self.peers.write().set_trusted_peer(enr);
|
||||
}
|
||||
|
||||
pub fn remove_trusted_peer(&self, enr: Enr) {
|
||||
self.peers.write().unset_trusted_peer(enr);
|
||||
}
|
||||
|
||||
pub fn trusted_peers(&self) -> Vec<PeerId> {
|
||||
self.peers.read().trusted_peers()
|
||||
}
|
||||
|
||||
/// Updates the syncing state of the node.
|
||||
///
|
||||
/// The old state is returned
|
||||
|
||||
@@ -13,6 +13,7 @@ use futures::prelude::*;
|
||||
use lighthouse_network::rpc::{RequestId, RequestType};
|
||||
use lighthouse_network::service::Network;
|
||||
use lighthouse_network::types::GossipKind;
|
||||
use lighthouse_network::Enr;
|
||||
use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance};
|
||||
use lighthouse_network::{
|
||||
rpc::{GoodbyeReason, RpcErrorResponse},
|
||||
@@ -101,6 +102,10 @@ pub enum NetworkMessage<E: EthSpec> {
|
||||
reason: GoodbyeReason,
|
||||
source: ReportSource,
|
||||
},
|
||||
/// Connect to a trusted peer and try to maintain the connection.
|
||||
ConnectTrustedPeer(Enr),
|
||||
/// Disconnect from a trusted peer and remove it from the `trusted_peers` mapping.
|
||||
DisconnectTrustedPeer(Enr),
|
||||
}
|
||||
|
||||
/// Messages triggered by validators that may trigger a subscription to a subnet.
|
||||
@@ -666,6 +671,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
reason,
|
||||
source,
|
||||
} => self.libp2p.goodbye_peer(&peer_id, reason, source),
|
||||
NetworkMessage::ConnectTrustedPeer(enr) => {
|
||||
self.libp2p.dial_trusted_peer(enr);
|
||||
}
|
||||
NetworkMessage::DisconnectTrustedPeer(enr) => {
|
||||
self.libp2p.remove_trusted_peer(enr);
|
||||
}
|
||||
NetworkMessage::SubscribeCoreTopics => {
|
||||
if self.subscribed_core_topics() {
|
||||
return;
|
||||
|
||||
@@ -21,8 +21,8 @@ static EMPTY_PUBKEY: LazyLock<PublicKeyBytes> = LazyLock::new(PublicKeyBytes::em
|
||||
pub enum Error {
|
||||
InvalidHierarchy,
|
||||
DiffDeletionsNotSupported,
|
||||
UnableToComputeDiff,
|
||||
UnableToApplyDiff,
|
||||
UnableToComputeDiff(xdelta3::Error),
|
||||
UnableToApplyDiff(xdelta3::Error),
|
||||
BalancesIncompleteChunk,
|
||||
Compression(std::io::Error),
|
||||
InvalidSszState(ssz::DecodeError),
|
||||
@@ -323,9 +323,15 @@ impl BytesDiff {
|
||||
}
|
||||
|
||||
pub fn compute_xdelta(source_bytes: &[u8], target_bytes: &[u8]) -> Result<Self, Error> {
|
||||
let bytes = xdelta3::encode(target_bytes, source_bytes)
|
||||
.ok_or(Error::UnableToComputeDiff)
|
||||
.unwrap();
|
||||
// TODO(hdiff): Use a smaller estimate for the output diff buffer size, currently the
|
||||
// xdelta3 lib will use 2x the size of the source plus the target length, which is 4x the
|
||||
// size of the hdiff buffer. In practice, diffs are almost always smaller than buffers (by a
|
||||
// signficiant factor), so this is 4-16x larger than necessary in a temporary allocation.
|
||||
//
|
||||
// We should use an estimated size that *should* be enough, and then dynamically increase it
|
||||
// if we hit an insufficient space error.
|
||||
let bytes =
|
||||
xdelta3::encode(target_bytes, source_bytes).map_err(Error::UnableToComputeDiff)?;
|
||||
Ok(Self { bytes })
|
||||
}
|
||||
|
||||
@@ -334,8 +340,31 @@ impl BytesDiff {
|
||||
}
|
||||
|
||||
pub fn apply_xdelta(&self, source: &[u8], target: &mut Vec<u8>) -> Result<(), Error> {
|
||||
*target = xdelta3::decode(&self.bytes, source).ok_or(Error::UnableToApplyDiff)?;
|
||||
Ok(())
|
||||
// TODO(hdiff): Dynamic buffer allocation. This is a stopgap until we implement a schema
|
||||
// change to store the output buffer size inside the `BytesDiff`.
|
||||
let mut output_length = ((source.len() + self.bytes.len()) * 3) / 2;
|
||||
let mut num_resizes = 0;
|
||||
loop {
|
||||
match xdelta3::decode_with_output_len(&self.bytes, source, output_length as u32) {
|
||||
Ok(result_buffer) => {
|
||||
*target = result_buffer;
|
||||
|
||||
metrics::observe(
|
||||
&metrics::BEACON_HDIFF_BUFFER_APPLY_RESIZES,
|
||||
num_resizes as f64,
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Err(xdelta3::Error::InsufficientOutputLength) => {
|
||||
// Double the output buffer length and try again.
|
||||
output_length *= 2;
|
||||
num_resizes += 1;
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(Error::UnableToApplyDiff(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Byte size of this instance
|
||||
|
||||
@@ -202,6 +202,13 @@ pub static BEACON_HDIFF_BUFFER_CLONE_TIMES: LazyLock<Result<Histogram>> = LazyLo
|
||||
"Time required to clone hierarchical diff buffer bytes",
|
||||
)
|
||||
});
|
||||
pub static BEACON_HDIFF_BUFFER_APPLY_RESIZES: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram_with_buckets(
|
||||
"store_hdiff_buffer_apply_resizes",
|
||||
"Number of times during diff application that the output buffer had to be resized before decoding succeeded",
|
||||
Ok(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0])
|
||||
)
|
||||
});
|
||||
/*
|
||||
* Beacon Block
|
||||
*/
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "boot_node"
|
||||
version = "7.0.0-beta.4"
|
||||
version = "7.0.0-beta.5"
|
||||
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||
edition = { workspace = true }
|
||||
|
||||
|
||||
@@ -7,7 +7,10 @@ pub mod sync_state;
|
||||
|
||||
use crate::{
|
||||
lighthouse::sync_state::SyncState,
|
||||
types::{DepositTreeSnapshot, Epoch, FinalizedExecutionBlock, GenericResponse, ValidatorId},
|
||||
types::{
|
||||
AdminPeer, DepositTreeSnapshot, Epoch, FinalizedExecutionBlock, GenericResponse,
|
||||
ValidatorId,
|
||||
},
|
||||
BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot,
|
||||
};
|
||||
use proto_array::core::ProtoArray;
|
||||
@@ -365,6 +368,30 @@ impl BeaconNodeHttpClient {
|
||||
self.post_with_response(path, &()).await
|
||||
}
|
||||
|
||||
/// `POST lighthouse/add_peer`
|
||||
pub async fn post_lighthouse_add_peer(&self, req: AdminPeer) -> Result<(), Error> {
|
||||
let mut path = self.server.full.clone();
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("lighthouse")
|
||||
.push("add_peer");
|
||||
|
||||
self.post_with_response(path, &req).await
|
||||
}
|
||||
|
||||
/// `POST lighthouse/remove_peer`
|
||||
pub async fn post_lighthouse_remove_peer(&self, req: AdminPeer) -> Result<(), Error> {
|
||||
let mut path = self.server.full.clone();
|
||||
|
||||
path.path_segments_mut()
|
||||
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||
.push("lighthouse")
|
||||
.push("remove_peer");
|
||||
|
||||
self.post_with_response(path, &req).await
|
||||
}
|
||||
|
||||
/*
|
||||
Analysis endpoints.
|
||||
*/
|
||||
|
||||
@@ -1411,6 +1411,11 @@ pub struct ManualFinalizationRequestData {
|
||||
pub block_root: Hash256,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct AdminPeer {
|
||||
pub enr: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct LivenessRequestData {
|
||||
pub epoch: Epoch,
|
||||
|
||||
@@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!(
|
||||
// NOTE: using --match instead of --exclude for compatibility with old Git
|
||||
"--match=thiswillnevermatchlol"
|
||||
],
|
||||
prefix = "Lighthouse/v7.0.0-beta.4-",
|
||||
fallback = "Lighthouse/v7.0.0-beta.4"
|
||||
prefix = "Lighthouse/v7.0.0-beta.5-",
|
||||
fallback = "Lighthouse/v7.0.0-beta.5"
|
||||
);
|
||||
|
||||
/// Returns the first eight characters of the latest commit hash for this build.
|
||||
@@ -54,7 +54,7 @@ pub fn version_with_platform() -> String {
|
||||
///
|
||||
/// `1.5.1`
|
||||
pub fn version() -> &'static str {
|
||||
"7.0.0-beta.4"
|
||||
"7.0.0-beta.5"
|
||||
}
|
||||
|
||||
/// Returns the name of the current client running.
|
||||
|
||||
@@ -175,6 +175,7 @@ pub fn process_epoch_single_pass<E: EthSpec>(
|
||||
|
||||
let mut earliest_exit_epoch = state.earliest_exit_epoch().ok();
|
||||
let mut exit_balance_to_consume = state.exit_balance_to_consume().ok();
|
||||
let validators_in_consolidations = get_validators_in_consolidations(state);
|
||||
|
||||
// Split the state into several disjoint mutable borrows.
|
||||
let (
|
||||
@@ -317,17 +318,26 @@ pub fn process_epoch_single_pass<E: EthSpec>(
|
||||
|
||||
// `process_effective_balance_updates`
|
||||
if conf.effective_balance_updates {
|
||||
process_single_effective_balance_update(
|
||||
validator_info.index,
|
||||
*balance,
|
||||
&mut validator,
|
||||
validator_info.current_epoch_participation,
|
||||
&mut next_epoch_cache,
|
||||
progressive_balances,
|
||||
effective_balances_ctxt,
|
||||
state_ctxt,
|
||||
spec,
|
||||
)?;
|
||||
if validators_in_consolidations.contains(&validator_info.index) {
|
||||
process_single_dummy_effective_balance_update(
|
||||
validator_info.index,
|
||||
&validator,
|
||||
&mut next_epoch_cache,
|
||||
state_ctxt,
|
||||
)?;
|
||||
} else {
|
||||
process_single_effective_balance_update(
|
||||
validator_info.index,
|
||||
*balance,
|
||||
&mut validator,
|
||||
validator_info.current_epoch_participation,
|
||||
&mut next_epoch_cache,
|
||||
progressive_balances,
|
||||
effective_balances_ctxt,
|
||||
state_ctxt,
|
||||
spec,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -430,6 +440,7 @@ pub fn process_epoch_single_pass<E: EthSpec>(
|
||||
if fork_name.electra_enabled() && conf.pending_consolidations {
|
||||
process_pending_consolidations(
|
||||
state,
|
||||
&validators_in_consolidations,
|
||||
&mut next_epoch_cache,
|
||||
effective_balances_ctxt,
|
||||
conf.effective_balance_updates,
|
||||
@@ -1026,12 +1037,38 @@ fn process_pending_deposits_for_validator(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return the set of validators referenced by consolidations, either as source or target.
|
||||
///
|
||||
/// This function is blind to whether the consolidations are valid and capable of being processed,
|
||||
/// it just returns the set of all indices present in consolidations. This is *sufficient* to
|
||||
/// make consolidations play nicely with effective balance updates. The algorithm used is:
|
||||
///
|
||||
/// - In the single pass: apply effective balance updates for all validators *not* referenced by
|
||||
/// consolidations.
|
||||
/// - Apply consolidations.
|
||||
/// - Apply effective balance updates for all validators previously skipped.
|
||||
///
|
||||
/// Prior to Electra, the empty set is returned.
|
||||
fn get_validators_in_consolidations<E: EthSpec>(state: &BeaconState<E>) -> BTreeSet<usize> {
|
||||
let mut referenced_validators = BTreeSet::new();
|
||||
|
||||
if let Ok(pending_consolidations) = state.pending_consolidations() {
|
||||
for pending_consolidation in pending_consolidations {
|
||||
referenced_validators.insert(pending_consolidation.source_index as usize);
|
||||
referenced_validators.insert(pending_consolidation.target_index as usize);
|
||||
}
|
||||
}
|
||||
|
||||
referenced_validators
|
||||
}
|
||||
|
||||
/// We process pending consolidations after all of single-pass epoch processing, and then patch up
|
||||
/// the effective balances for affected validators.
|
||||
///
|
||||
/// This is safe because processing consolidations does not depend on the `effective_balance`.
|
||||
fn process_pending_consolidations<E: EthSpec>(
|
||||
state: &mut BeaconState<E>,
|
||||
validators_in_consolidations: &BTreeSet<usize>,
|
||||
next_epoch_cache: &mut PreEpochCache,
|
||||
effective_balances_ctxt: &EffectiveBalancesContext,
|
||||
perform_effective_balance_updates: bool,
|
||||
@@ -1042,8 +1079,6 @@ fn process_pending_consolidations<E: EthSpec>(
|
||||
let next_epoch = state.next_epoch()?;
|
||||
let pending_consolidations = state.pending_consolidations()?.clone();
|
||||
|
||||
let mut affected_validators = BTreeSet::new();
|
||||
|
||||
for pending_consolidation in &pending_consolidations {
|
||||
let source_index = pending_consolidation.source_index as usize;
|
||||
let target_index = pending_consolidation.target_index as usize;
|
||||
@@ -1069,9 +1104,6 @@ fn process_pending_consolidations<E: EthSpec>(
|
||||
decrease_balance(state, source_index, source_effective_balance)?;
|
||||
increase_balance(state, target_index, source_effective_balance)?;
|
||||
|
||||
affected_validators.insert(source_index);
|
||||
affected_validators.insert(target_index);
|
||||
|
||||
next_pending_consolidation.safe_add_assign(1)?;
|
||||
}
|
||||
|
||||
@@ -1087,7 +1119,7 @@ fn process_pending_consolidations<E: EthSpec>(
|
||||
// Re-process effective balance updates for validators affected by consolidations.
|
||||
let (validators, balances, _, current_epoch_participation, _, progressive_balances, _, _) =
|
||||
state.mutable_validator_fields()?;
|
||||
for validator_index in affected_validators {
|
||||
for &validator_index in validators_in_consolidations {
|
||||
let balance = *balances
|
||||
.get(validator_index)
|
||||
.ok_or(BeaconStateError::UnknownValidator(validator_index))?;
|
||||
@@ -1129,6 +1161,28 @@ impl EffectiveBalancesContext {
|
||||
}
|
||||
}
|
||||
|
||||
/// This function is called for validators that do not have their effective balance updated as
|
||||
/// part of the single-pass loop. For these validators we compute their true effective balance
|
||||
/// update after processing consolidations. However, to maintain the invariants of the
|
||||
/// `PreEpochCache` we must register _some_ effective balance for them immediately.
|
||||
fn process_single_dummy_effective_balance_update(
|
||||
validator_index: usize,
|
||||
validator: &Cow<Validator>,
|
||||
next_epoch_cache: &mut PreEpochCache,
|
||||
state_ctxt: &StateContext,
|
||||
) -> Result<(), Error> {
|
||||
// Populate the effective balance cache with the current effective balance. This will be
|
||||
// overriden when `process_single_effective_balance_update` is called.
|
||||
let is_active_next_epoch = validator.is_active_at(state_ctxt.next_epoch);
|
||||
let temporary_effective_balance = validator.effective_balance;
|
||||
next_epoch_cache.update_effective_balance(
|
||||
validator_index,
|
||||
temporary_effective_balance,
|
||||
is_active_next_epoch,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This function abstracts over phase0 and Electra effective balance processing.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn process_single_effective_balance_update(
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lcli"
|
||||
description = "Lighthouse CLI (modeled after zcli)"
|
||||
version = "7.0.0-beta.4"
|
||||
version = "7.0.0-beta.5"
|
||||
authors = ["Paul Hauner <paul@paulhauner.com>"]
|
||||
edition = { workspace = true }
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lighthouse"
|
||||
version = "7.0.0-beta.4"
|
||||
version = "7.0.0-beta.5"
|
||||
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||
edition = { workspace = true }
|
||||
autotests = false
|
||||
|
||||
Reference in New Issue
Block a user