mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 00:31:50 +00:00
Merge remote-tracking branch 'origin/stable' into unstable
This commit is contained in:
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -835,7 +835,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "beacon_node"
|
name = "beacon_node"
|
||||||
version = "5.2.1"
|
version = "5.3.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"beacon_chain",
|
"beacon_chain",
|
||||||
"clap",
|
"clap",
|
||||||
@@ -1057,7 +1057,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "boot_node"
|
name = "boot_node"
|
||||||
version = "5.2.1"
|
version = "5.3.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"beacon_node",
|
"beacon_node",
|
||||||
"clap",
|
"clap",
|
||||||
@@ -4438,7 +4438,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lcli"
|
name = "lcli"
|
||||||
version = "5.2.1"
|
version = "5.3.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"account_utils",
|
"account_utils",
|
||||||
"beacon_chain",
|
"beacon_chain",
|
||||||
@@ -5009,7 +5009,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lighthouse"
|
name = "lighthouse"
|
||||||
version = "5.2.1"
|
version = "5.3.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"account_manager",
|
"account_manager",
|
||||||
"account_utils",
|
"account_utils",
|
||||||
@@ -6576,8 +6576,7 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "quick-protobuf"
|
name = "quick-protobuf"
|
||||||
version = "0.8.1"
|
version = "0.8.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/sigp/quick-protobuf.git?rev=681f413312404ab6e51f0b46f39b0075c6f4ebfd#681f413312404ab6e51f0b46f39b0075c6f4ebfd"
|
||||||
checksum = "9d6da84cc204722a989e01ba2f6e1e276e190f22263d0cb6ce8526fcdb0d2e1f"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"byteorder",
|
"byteorder",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ bytes = "1"
|
|||||||
clap = { version = "4.5.4", features = ["derive", "cargo", "wrap_help"] }
|
clap = { version = "4.5.4", features = ["derive", "cargo", "wrap_help"] }
|
||||||
# Turn off c-kzg's default features which include `blst/portable`. We can turn on blst's portable
|
# Turn off c-kzg's default features which include `blst/portable`. We can turn on blst's portable
|
||||||
# feature ourselves when desired.
|
# feature ourselves when desired.
|
||||||
c-kzg = { version = "1", default-features = false }
|
c-kzg = { version = "1", default-features = false }
|
||||||
compare_fields_derive = { path = "common/compare_fields_derive" }
|
compare_fields_derive = { path = "common/compare_fields_derive" }
|
||||||
criterion = "0.5"
|
criterion = "0.5"
|
||||||
delay_map = "0.3"
|
delay_map = "0.3"
|
||||||
@@ -240,6 +240,9 @@ validator_client = { path = "validator_client" }
|
|||||||
validator_dir = { path = "common/validator_dir" }
|
validator_dir = { path = "common/validator_dir" }
|
||||||
warp_utils = { path = "common/warp_utils" }
|
warp_utils = { path = "common/warp_utils" }
|
||||||
|
|
||||||
|
[patch.crates-io]
|
||||||
|
quick-protobuf = { git = "https://github.com/sigp/quick-protobuf.git", rev = "681f413312404ab6e51f0b46f39b0075c6f4ebfd" }
|
||||||
|
|
||||||
[profile.maxperf]
|
[profile.maxperf]
|
||||||
inherits = "release"
|
inherits = "release"
|
||||||
lto = "fat"
|
lto = "fat"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "beacon_node"
|
name = "beacon_node"
|
||||||
version = "5.2.1"
|
version = "5.3.0"
|
||||||
authors = [
|
authors = [
|
||||||
"Paul Hauner <paul@paulhauner.com>",
|
"Paul Hauner <paul@paulhauner.com>",
|
||||||
"Age Manning <Age@AgeManning.com",
|
"Age Manning <Age@AgeManning.com",
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ use fork_choice::{
|
|||||||
};
|
};
|
||||||
use itertools::process_results;
|
use itertools::process_results;
|
||||||
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||||
use slog::{crit, debug, error, warn, Logger};
|
use slog::{crit, debug, error, info, warn, Logger};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use state_processing::AllCaches;
|
use state_processing::AllCaches;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -1212,7 +1212,7 @@ fn detect_reorg<E: EthSpec>(
|
|||||||
&metrics::FORK_CHOICE_REORG_DISTANCE,
|
&metrics::FORK_CHOICE_REORG_DISTANCE,
|
||||||
reorg_distance.as_u64() as i64,
|
reorg_distance.as_u64() as i64,
|
||||||
);
|
);
|
||||||
warn!(
|
info!(
|
||||||
log,
|
log,
|
||||||
"Beacon chain re-org";
|
"Beacon chain re-org";
|
||||||
"previous_head" => ?old_block_root,
|
"previous_head" => ?old_block_root,
|
||||||
|
|||||||
@@ -1129,7 +1129,7 @@ impl Service {
|
|||||||
|
|
||||||
Ok(BlockCacheUpdateOutcome {
|
Ok(BlockCacheUpdateOutcome {
|
||||||
blocks_imported,
|
blocks_imported,
|
||||||
head_block_number: self.inner.block_cache.read().highest_block_number(),
|
head_block_number: block_cache.highest_block_number(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -111,9 +111,6 @@ pub struct SyncingChain<T: BeaconChainTypes> {
|
|||||||
/// The current processing batch, if any.
|
/// The current processing batch, if any.
|
||||||
current_processing_batch: Option<BatchId>,
|
current_processing_batch: Option<BatchId>,
|
||||||
|
|
||||||
/// Batches validated by this chain.
|
|
||||||
validated_batches: u64,
|
|
||||||
|
|
||||||
/// The chain's log.
|
/// The chain's log.
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
}
|
}
|
||||||
@@ -161,7 +158,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
attempted_optimistic_starts: HashSet::default(),
|
attempted_optimistic_starts: HashSet::default(),
|
||||||
state: ChainSyncingState::Stopped,
|
state: ChainSyncingState::Stopped,
|
||||||
current_processing_batch: None,
|
current_processing_batch: None,
|
||||||
validated_batches: 0,
|
|
||||||
log: log.new(o!("chain" => id)),
|
log: log.new(o!("chain" => id)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -182,8 +178,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Progress in epochs made by the chain
|
/// Progress in epochs made by the chain
|
||||||
pub fn validated_epochs(&self) -> u64 {
|
pub fn processed_epochs(&self) -> u64 {
|
||||||
self.validated_batches * EPOCHS_PER_BATCH
|
self.processing_target
|
||||||
|
.saturating_sub(self.start_epoch)
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the total count of pending blocks in all the batches of this chain
|
/// Returns the total count of pending blocks in all the batches of this chain
|
||||||
@@ -655,7 +653,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
let removed_batches = std::mem::replace(&mut self.batches, remaining_batches);
|
let removed_batches = std::mem::replace(&mut self.batches, remaining_batches);
|
||||||
|
|
||||||
for (id, batch) in removed_batches.into_iter() {
|
for (id, batch) in removed_batches.into_iter() {
|
||||||
self.validated_batches = self.validated_batches.saturating_add(1);
|
|
||||||
// only for batches awaiting validation can we be sure the last attempt is
|
// only for batches awaiting validation can we be sure the last attempt is
|
||||||
// right, and thus, that any different attempt is wrong
|
// right, and thus, that any different attempt is wrong
|
||||||
match batch.state() {
|
match batch.state() {
|
||||||
@@ -1212,7 +1209,6 @@ impl<T: BeaconChainTypes> slog::KV for SyncingChain<T> {
|
|||||||
)?;
|
)?;
|
||||||
serializer.emit_usize("batches", self.batches.len())?;
|
serializer.emit_usize("batches", self.batches.len())?;
|
||||||
serializer.emit_usize("peers", self.peers.len())?;
|
serializer.emit_usize("peers", self.peers.len())?;
|
||||||
serializer.emit_u64("validated_batches", self.validated_batches)?;
|
|
||||||
serializer.emit_arguments("state", &format_args!("{:?}", self.state))?;
|
serializer.emit_arguments("state", &format_args!("{:?}", self.state))?;
|
||||||
slog::Result::Ok(())
|
slog::Result::Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ use types::{Epoch, Hash256, Slot};
|
|||||||
const PARALLEL_HEAD_CHAINS: usize = 2;
|
const PARALLEL_HEAD_CHAINS: usize = 2;
|
||||||
|
|
||||||
/// Minimum work we require a finalized chain to do before picking a chain with more peers.
|
/// Minimum work we require a finalized chain to do before picking a chain with more peers.
|
||||||
const MIN_FINALIZED_CHAIN_VALIDATED_EPOCHS: u64 = 10;
|
const MIN_FINALIZED_CHAIN_PROCESSED_EPOCHS: u64 = 10;
|
||||||
|
|
||||||
/// The state of the long range/batch sync.
|
/// The state of the long range/batch sync.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -273,8 +273,8 @@ impl<T: BeaconChainTypes, C: BlockStorage> ChainCollection<T, C> {
|
|||||||
// chains are different, check that they don't have the same number of peers
|
// chains are different, check that they don't have the same number of peers
|
||||||
if let Some(syncing_chain) = self.finalized_chains.get_mut(&syncing_id) {
|
if let Some(syncing_chain) = self.finalized_chains.get_mut(&syncing_id) {
|
||||||
if max_peers > syncing_chain.available_peers()
|
if max_peers > syncing_chain.available_peers()
|
||||||
&& syncing_chain.validated_epochs()
|
&& syncing_chain.processed_epochs()
|
||||||
> MIN_FINALIZED_CHAIN_VALIDATED_EPOCHS
|
> MIN_FINALIZED_CHAIN_PROCESSED_EPOCHS
|
||||||
{
|
{
|
||||||
syncing_chain.stop_syncing();
|
syncing_chain.stop_syncing();
|
||||||
old_id = Some(Some(syncing_id));
|
old_id = Some(Some(syncing_id));
|
||||||
|
|||||||
@@ -50,10 +50,10 @@ A pair of messages at `INFO` level will be logged if a re-org opportunity is det
|
|||||||
|
|
||||||
> INFO Proposing block to re-org current head head_to_reorg: 0xf64f…2b49, slot: 1105320
|
> INFO Proposing block to re-org current head head_to_reorg: 0xf64f…2b49, slot: 1105320
|
||||||
|
|
||||||
This should be followed shortly after by a `WARN` log indicating that a re-org occurred. This is
|
This should be followed shortly after by a `INFO` log indicating that a re-org occurred. This is
|
||||||
expected and normal:
|
expected and normal:
|
||||||
|
|
||||||
> WARN Beacon chain re-org reorg_distance: 1, new_slot: 1105320, new_head: 0x72791549e4ca792f91053bc7cf1e55c6fbe745f78ce7a16fc3acb6f09161becd, previous_slot: 1105319, previous_head: 0xf64f8e5ed617dc18c1e759dab5d008369767c3678416dac2fe1d389562842b49
|
> INFO Beacon chain re-org reorg_distance: 1, new_slot: 1105320, new_head: 0x72791549e4ca792f91053bc7cf1e55c6fbe745f78ce7a16fc3acb6f09161becd, previous_slot: 1105319, previous_head: 0xf64f8e5ed617dc18c1e759dab5d008369767c3678416dac2fe1d389562842b49
|
||||||
|
|
||||||
In case a re-org is not viable (which should be most of the time), Lighthouse will just propose a
|
In case a re-org is not viable (which should be most of the time), Lighthouse will just propose a
|
||||||
block as normal and log the reason the re-org was not attempted at debug level:
|
block as normal and log the reason the re-org was not attempted at debug level:
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "boot_node"
|
name = "boot_node"
|
||||||
version = "5.2.1"
|
version = "5.3.0"
|
||||||
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||||
edition = { workspace = true }
|
edition = { workspace = true }
|
||||||
|
|
||||||
|
|||||||
@@ -121,6 +121,7 @@ impl fmt::Display for Error {
|
|||||||
pub struct Timeouts {
|
pub struct Timeouts {
|
||||||
pub attestation: Duration,
|
pub attestation: Duration,
|
||||||
pub attester_duties: Duration,
|
pub attester_duties: Duration,
|
||||||
|
pub attestation_subscriptions: Duration,
|
||||||
pub liveness: Duration,
|
pub liveness: Duration,
|
||||||
pub proposal: Duration,
|
pub proposal: Duration,
|
||||||
pub proposer_duties: Duration,
|
pub proposer_duties: Duration,
|
||||||
@@ -137,6 +138,7 @@ impl Timeouts {
|
|||||||
Timeouts {
|
Timeouts {
|
||||||
attestation: timeout,
|
attestation: timeout,
|
||||||
attester_duties: timeout,
|
attester_duties: timeout,
|
||||||
|
attestation_subscriptions: timeout,
|
||||||
liveness: timeout,
|
liveness: timeout,
|
||||||
proposal: timeout,
|
proposal: timeout,
|
||||||
proposer_duties: timeout,
|
proposer_duties: timeout,
|
||||||
@@ -2540,7 +2542,12 @@ impl BeaconNodeHttpClient {
|
|||||||
.push("validator")
|
.push("validator")
|
||||||
.push("beacon_committee_subscriptions");
|
.push("beacon_committee_subscriptions");
|
||||||
|
|
||||||
self.post(path, &subscriptions).await?;
|
self.post_with_timeout(
|
||||||
|
path,
|
||||||
|
&subscriptions,
|
||||||
|
self.timeouts.attestation_subscriptions,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!(
|
|||||||
// NOTE: using --match instead of --exclude for compatibility with old Git
|
// NOTE: using --match instead of --exclude for compatibility with old Git
|
||||||
"--match=thiswillnevermatchlol"
|
"--match=thiswillnevermatchlol"
|
||||||
],
|
],
|
||||||
prefix = "Lighthouse/v5.2.1-",
|
prefix = "Lighthouse/v5.3.0-",
|
||||||
fallback = "Lighthouse/v5.2.1"
|
fallback = "Lighthouse/v5.3.0"
|
||||||
);
|
);
|
||||||
|
|
||||||
/// Returns the first eight characters of the latest commit hash for this build.
|
/// Returns the first eight characters of the latest commit hash for this build.
|
||||||
|
|||||||
@@ -1372,10 +1372,13 @@ pub struct Config {
|
|||||||
#[serde(with = "serde_utils::quoted_u64")]
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
max_per_epoch_activation_exit_churn_limit: u64,
|
max_per_epoch_activation_exit_churn_limit: u64,
|
||||||
|
|
||||||
|
#[serde(default = "default_custody_requirement")]
|
||||||
#[serde(with = "serde_utils::quoted_u64")]
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
custody_requirement: u64,
|
custody_requirement: u64,
|
||||||
|
#[serde(default = "default_data_column_sidecar_subnet_count")]
|
||||||
#[serde(with = "serde_utils::quoted_u64")]
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
data_column_sidecar_subnet_count: u64,
|
data_column_sidecar_subnet_count: u64,
|
||||||
|
#[serde(default = "default_number_of_columns")]
|
||||||
#[serde(with = "serde_utils::quoted_u64")]
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
number_of_columns: u64,
|
number_of_columns: u64,
|
||||||
}
|
}
|
||||||
@@ -1516,6 +1519,18 @@ const fn default_maximum_gossip_clock_disparity_millis() -> u64 {
|
|||||||
500
|
500
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const fn default_custody_requirement() -> u64 {
|
||||||
|
1
|
||||||
|
}
|
||||||
|
|
||||||
|
const fn default_data_column_sidecar_subnet_count() -> u64 {
|
||||||
|
32
|
||||||
|
}
|
||||||
|
|
||||||
|
const fn default_number_of_columns() -> u64 {
|
||||||
|
128
|
||||||
|
}
|
||||||
|
|
||||||
fn max_blocks_by_root_request_common(max_request_blocks: u64) -> usize {
|
fn max_blocks_by_root_request_common(max_request_blocks: u64) -> usize {
|
||||||
let max_request_blocks = max_request_blocks as usize;
|
let max_request_blocks = max_request_blocks as usize;
|
||||||
RuntimeVariableList::<Hash256>::from_vec(
|
RuntimeVariableList::<Hash256>::from_vec(
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lcli"
|
name = "lcli"
|
||||||
description = "Lighthouse CLI (modeled after zcli)"
|
description = "Lighthouse CLI (modeled after zcli)"
|
||||||
version = "5.2.1"
|
version = "5.3.0"
|
||||||
authors = ["Paul Hauner <paul@paulhauner.com>"]
|
authors = ["Paul Hauner <paul@paulhauner.com>"]
|
||||||
edition = { workspace = true }
|
edition = { workspace = true }
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lighthouse"
|
name = "lighthouse"
|
||||||
version = "5.2.1"
|
version = "5.3.0"
|
||||||
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||||
edition = { workspace = true }
|
edition = { workspace = true }
|
||||||
autotests = false
|
autotests = false
|
||||||
|
|||||||
@@ -165,8 +165,12 @@ impl<'env> Cursor<'env> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_current(&mut self) -> Result<Option<(Key<'env>, Value<'env>)>, Error> {
|
pub fn get_current(&mut self) -> Result<Option<(Key<'env>, Value<'env>)>, Error> {
|
||||||
|
// FIXME: lmdb has an extremely broken API which can mutate the SHARED REFERENCE
|
||||||
|
// `value` after `get_current` is called. We need to convert it to a Vec here in order
|
||||||
|
// to avoid `value` changing after another cursor operation. I think this represents a bug
|
||||||
|
// in the LMDB bindings, as shared references should be immutable.
|
||||||
if let Some((Some(key), value)) = self.cursor.get(None, None, MDB_GET_CURRENT).optional()? {
|
if let Some((Some(key), value)) = self.cursor.get(None, None, MDB_GET_CURRENT).optional()? {
|
||||||
Ok(Some((Cow::Borrowed(key), Cow::Borrowed(value))))
|
Ok(Some((Cow::Borrowed(key), Cow::Owned(value.to_vec()))))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -235,3 +235,8 @@ fn no_crash_blocks_example1() {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn no_crash_aug_24() {
|
||||||
|
random_test(13519442335106054152, TestConfig::default())
|
||||||
|
}
|
||||||
|
|||||||
@@ -134,6 +134,12 @@ impl<T: Debug> fmt::Display for Errors<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T> Errors<T> {
|
||||||
|
pub fn num_errors(&self) -> usize {
|
||||||
|
self.0.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Reasons why a candidate might not be ready.
|
/// Reasons why a candidate might not be ready.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub enum CandidateError {
|
pub enum CandidateError {
|
||||||
@@ -599,46 +605,41 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
|
|||||||
F: Fn(&'a BeaconNodeHttpClient) -> R,
|
F: Fn(&'a BeaconNodeHttpClient) -> R,
|
||||||
R: Future<Output = Result<O, Err>>,
|
R: Future<Output = Result<O, Err>>,
|
||||||
{
|
{
|
||||||
let mut results = vec![];
|
|
||||||
let mut to_retry = vec![];
|
let mut to_retry = vec![];
|
||||||
let mut retry_unsynced = vec![];
|
let mut retry_unsynced = vec![];
|
||||||
|
|
||||||
// Run `func` using a `candidate`, returning the value or capturing errors.
|
// Run `func` using a `candidate`, returning the value or capturing errors.
|
||||||
//
|
let run_on_candidate = |candidate: &'a CandidateBeaconNode<E>| async {
|
||||||
// We use a macro instead of a closure here since it is not trivial to move `func` into a
|
inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]);
|
||||||
// closure.
|
|
||||||
macro_rules! try_func {
|
|
||||||
($candidate: ident) => {{
|
|
||||||
inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]);
|
|
||||||
|
|
||||||
// There exists a race condition where `func` may be called when the candidate is
|
// There exists a race condition where `func` may be called when the candidate is
|
||||||
// actually not ready. We deem this an acceptable inefficiency.
|
// actually not ready. We deem this an acceptable inefficiency.
|
||||||
match func(&$candidate.beacon_node).await {
|
match func(&candidate.beacon_node).await {
|
||||||
Ok(val) => results.push(Ok(val)),
|
Ok(val) => Ok(val),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// If we have an error on this function, make the client as not-ready.
|
// If we have an error on this function, mark the client as not-ready.
|
||||||
//
|
//
|
||||||
// There exists a race condition where the candidate may have been marked
|
// There exists a race condition where the candidate may have been marked
|
||||||
// as ready between the `func` call and now. We deem this an acceptable
|
// as ready between the `func` call and now. We deem this an acceptable
|
||||||
// inefficiency.
|
// inefficiency.
|
||||||
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
|
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
|
||||||
$candidate.set_offline().await;
|
candidate.set_offline().await;
|
||||||
}
|
|
||||||
results.push(Err((
|
|
||||||
$candidate.beacon_node.to_string(),
|
|
||||||
Error::RequestFailed(e),
|
|
||||||
)));
|
|
||||||
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
|
|
||||||
}
|
}
|
||||||
|
inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]);
|
||||||
|
Err((candidate.beacon_node.to_string(), Error::RequestFailed(e)))
|
||||||
}
|
}
|
||||||
}};
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
// First pass: try `func` on all synced and ready candidates.
|
// First pass: try `func` on all synced and ready candidates.
|
||||||
//
|
//
|
||||||
// This ensures that we always choose a synced node if it is available.
|
// This ensures that we always choose a synced node if it is available.
|
||||||
|
let mut first_batch_futures = vec![];
|
||||||
for candidate in &self.candidates {
|
for candidate in &self.candidates {
|
||||||
match candidate.status(RequireSynced::Yes).await {
|
match candidate.status(RequireSynced::Yes).await {
|
||||||
|
Ok(_) => {
|
||||||
|
first_batch_futures.push(run_on_candidate(candidate));
|
||||||
|
}
|
||||||
Err(CandidateError::NotSynced) if require_synced == false => {
|
Err(CandidateError::NotSynced) if require_synced == false => {
|
||||||
// This client is unsynced we will try it after trying all synced clients
|
// This client is unsynced we will try it after trying all synced clients
|
||||||
retry_unsynced.push(candidate);
|
retry_unsynced.push(candidate);
|
||||||
@@ -647,22 +648,24 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
|
|||||||
// This client was not ready on the first pass, we might try it again later.
|
// This client was not ready on the first pass, we might try it again later.
|
||||||
to_retry.push(candidate);
|
to_retry.push(candidate);
|
||||||
}
|
}
|
||||||
Ok(_) => try_func!(candidate),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let first_batch_results = futures::future::join_all(first_batch_futures).await;
|
||||||
|
|
||||||
// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
|
// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
|
||||||
// unsynced candidates.
|
// unsynced candidates.
|
||||||
//
|
//
|
||||||
// Due to async race-conditions, it is possible that we will send a request to a candidate
|
// Due to async race-conditions, it is possible that we will send a request to a candidate
|
||||||
// that has been set to an offline/unready status. This is acceptable.
|
// that has been set to an offline/unready status. This is acceptable.
|
||||||
if require_synced == false {
|
let second_batch_results = if require_synced == false {
|
||||||
for candidate in retry_unsynced {
|
futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await
|
||||||
try_func!(candidate);
|
} else {
|
||||||
}
|
vec![]
|
||||||
}
|
};
|
||||||
|
|
||||||
// Third pass: try again, attempting to make non-ready clients become ready.
|
// Third pass: try again, attempting to make non-ready clients become ready.
|
||||||
|
let mut third_batch_futures = vec![];
|
||||||
|
let mut third_batch_results = vec![];
|
||||||
for candidate in to_retry {
|
for candidate in to_retry {
|
||||||
// If the candidate hasn't luckily transferred into the correct state in the meantime,
|
// If the candidate hasn't luckily transferred into the correct state in the meantime,
|
||||||
// force an update of the state.
|
// force an update of the state.
|
||||||
@@ -676,16 +679,21 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match new_status {
|
match new_status {
|
||||||
Ok(()) => try_func!(candidate),
|
Ok(()) => third_batch_futures.push(run_on_candidate(candidate)),
|
||||||
Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate),
|
Err(CandidateError::NotSynced) if require_synced == false => {
|
||||||
Err(e) => {
|
third_batch_futures.push(run_on_candidate(candidate))
|
||||||
results.push(Err((
|
|
||||||
candidate.beacon_node.to_string(),
|
|
||||||
Error::Unavailable(e),
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
Err(e) => third_batch_results.push(Err((
|
||||||
|
candidate.beacon_node.to_string(),
|
||||||
|
Error::Unavailable(e),
|
||||||
|
))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
third_batch_results.extend(futures::future::join_all(third_batch_futures).await);
|
||||||
|
|
||||||
|
let mut results = first_batch_results;
|
||||||
|
results.extend(second_batch_results);
|
||||||
|
results.extend(third_batch_results);
|
||||||
|
|
||||||
let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();
|
let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();
|
||||||
|
|
||||||
|
|||||||
@@ -86,7 +86,8 @@ const _: () = assert!({
|
|||||||
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
|
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
|
||||||
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
|
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
|
||||||
/// bringing in the entire crate.
|
/// bringing in the entire crate.
|
||||||
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2);
|
const MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD: u64 = 2;
|
||||||
|
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD);
|
||||||
|
|
||||||
// The info in the enum variants is displayed in logging, clippy thinks it's dead code.
|
// The info in the enum variants is displayed in logging, clippy thinks it's dead code.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -121,6 +122,8 @@ pub struct DutyAndProof {
|
|||||||
pub struct SubscriptionSlots {
|
pub struct SubscriptionSlots {
|
||||||
/// Pairs of `(slot, already_sent)` in slot-descending order.
|
/// Pairs of `(slot, already_sent)` in slot-descending order.
|
||||||
slots: Vec<(Slot, AtomicBool)>,
|
slots: Vec<(Slot, AtomicBool)>,
|
||||||
|
/// The slot of the duty itself.
|
||||||
|
duty_slot: Slot,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a selection proof for `duty`.
|
/// Create a selection proof for `duty`.
|
||||||
@@ -172,18 +175,20 @@ impl SubscriptionSlots {
|
|||||||
.filter(|scheduled_slot| *scheduled_slot > current_slot)
|
.filter(|scheduled_slot| *scheduled_slot > current_slot)
|
||||||
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
|
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
|
||||||
.collect();
|
.collect();
|
||||||
Arc::new(Self { slots })
|
Arc::new(Self { slots, duty_slot })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return `true` if we should send a subscription at `slot`.
|
/// Return `true` if we should send a subscription at `slot`.
|
||||||
fn should_send_subscription_at(&self, slot: Slot) -> bool {
|
fn should_send_subscription_at(&self, slot: Slot) -> bool {
|
||||||
// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
|
// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
|
||||||
self.slots
|
slot + MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD <= self.duty_slot
|
||||||
.iter()
|
&& self
|
||||||
.rev()
|
.slots
|
||||||
.any(|(scheduled_slot, already_sent)| {
|
.iter()
|
||||||
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
|
.rev()
|
||||||
})
|
.any(|(scheduled_slot, already_sent)| {
|
||||||
|
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update our record of subscribed slots to account for successful subscription at `slot`.
|
/// Update our record of subscribed slots to account for successful subscription at `slot`.
|
||||||
@@ -737,7 +742,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
// If there are any subscriptions, push them out to beacon nodes
|
// If there are any subscriptions, push them out to beacon nodes
|
||||||
if !subscriptions.is_empty() {
|
if !subscriptions.is_empty() {
|
||||||
let subscriptions_ref = &subscriptions;
|
let subscriptions_ref = &subscriptions;
|
||||||
if let Err(e) = duties_service
|
let subscription_result = duties_service
|
||||||
.beacon_nodes
|
.beacon_nodes
|
||||||
.request(
|
.request(
|
||||||
RequireSynced::No,
|
RequireSynced::No,
|
||||||
@@ -753,15 +758,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
.await
|
.await
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await
|
.await;
|
||||||
{
|
if subscription_result.as_ref().is_ok() {
|
||||||
error!(
|
|
||||||
log,
|
|
||||||
"Failed to subscribe validators";
|
|
||||||
"error" => %e
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
// Record that subscriptions were successfully sent.
|
|
||||||
debug!(
|
debug!(
|
||||||
log,
|
log,
|
||||||
"Broadcast attestation subscriptions";
|
"Broadcast attestation subscriptions";
|
||||||
@@ -770,6 +768,25 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
for subscription_slots in subscription_slots_to_confirm {
|
for subscription_slots in subscription_slots_to_confirm {
|
||||||
subscription_slots.record_successful_subscription_at(current_slot);
|
subscription_slots.record_successful_subscription_at(current_slot);
|
||||||
}
|
}
|
||||||
|
} else if let Err(e) = subscription_result {
|
||||||
|
if e.num_errors() < duties_service.beacon_nodes.num_total() {
|
||||||
|
warn!(
|
||||||
|
log,
|
||||||
|
"Some subscriptions failed";
|
||||||
|
"error" => %e,
|
||||||
|
);
|
||||||
|
// If subscriptions were sent to at least one node, regard that as a success.
|
||||||
|
// There is some redundancy built into the subscription schedule to handle failures.
|
||||||
|
for subscription_slots in subscription_slots_to_confirm {
|
||||||
|
subscription_slots.record_successful_subscription_at(current_slot);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!(
|
||||||
|
log,
|
||||||
|
"All subscriptions failed";
|
||||||
|
"error" => %e
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -75,6 +75,7 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);
|
|||||||
/// This can help ensure that proper endpoint fallback occurs.
|
/// This can help ensure that proper endpoint fallback occurs.
|
||||||
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
|
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
|
||||||
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
|
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
|
||||||
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
@@ -323,6 +324,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
|
|||||||
Timeouts {
|
Timeouts {
|
||||||
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
|
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
|
||||||
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
|
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
|
||||||
|
attestation_subscriptions: slot_duration
|
||||||
|
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
|
||||||
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
|
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
|
||||||
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
|
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
|
||||||
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
|
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
|
||||||
|
|||||||
Reference in New Issue
Block a user