Merge branch 'unstable' into eip4844

This commit is contained in:
Diva M
2023-03-24 13:32:50 -05:00
48 changed files with 484 additions and 501 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "beacon_node"
version = "3.5.1"
version = "4.0.1-rc.0"
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com"]
edition = "2021"

View File

@@ -76,7 +76,7 @@ use itertools::process_results;
use itertools::Itertools;
use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella};
use parking_lot::{Mutex, RwLock};
use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError};
use proto_array::{DoNotReOrg, ProposerHeadError};
use safe_arith::SafeArith;
use slasher::Slasher;
use slog::{crit, debug, error, info, trace, warn, Logger};
@@ -485,7 +485,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn load_fork_choice(
store: BeaconStore<T>,
reset_payload_statuses: ResetPayloadStatuses,
count_unrealized_full: CountUnrealizedFull,
spec: &ChainSpec,
log: &Logger,
) -> Result<Option<BeaconForkChoice<T>>, Error> {
@@ -502,7 +501,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
persisted_fork_choice.fork_choice,
reset_payload_statuses,
fc_store,
count_unrealized_full,
spec,
log,
)?))
@@ -1986,7 +1984,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.slot()?,
verified.indexed_attestation(),
AttestationFromBlock::False,
&self.spec,
)
.map_err(Into::into)
}
@@ -2953,7 +2950,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&state,
payload_verification_status,
&self.spec,
count_unrealized.and(self.config.count_unrealized.into()),
count_unrealized,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
}
@@ -3096,7 +3093,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ResetPayloadStatuses::always_reset_conditionally(
self.config.always_reset_payload_statuses,
),
self.config.count_unrealized_full,
&self.store,
&self.spec,
&self.log,

View File

@@ -20,6 +20,14 @@ use types::{
Hash256, Slot,
};
/// Ensure this justified checkpoint has an epoch of 0 so that it is never
/// greater than the justified checkpoint and enshrined as the actual justified
/// checkpoint.
const JUNK_BEST_JUSTIFIED_CHECKPOINT: Checkpoint = Checkpoint {
epoch: Epoch::new(0),
root: Hash256::repeat_byte(0),
};
#[derive(Debug)]
pub enum Error {
UnableToReadSlot,
@@ -144,7 +152,6 @@ pub struct BeaconForkChoiceStore<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<
finalized_checkpoint: Checkpoint,
justified_checkpoint: Checkpoint,
justified_balances: JustifiedBalances,
best_justified_checkpoint: Checkpoint,
unrealized_justified_checkpoint: Checkpoint,
unrealized_finalized_checkpoint: Checkpoint,
proposer_boost_root: Hash256,
@@ -194,7 +201,6 @@ where
justified_checkpoint,
justified_balances,
finalized_checkpoint,
best_justified_checkpoint: justified_checkpoint,
unrealized_justified_checkpoint: justified_checkpoint,
unrealized_finalized_checkpoint: finalized_checkpoint,
proposer_boost_root: Hash256::zero(),
@@ -212,7 +218,7 @@ where
finalized_checkpoint: self.finalized_checkpoint,
justified_checkpoint: self.justified_checkpoint,
justified_balances: self.justified_balances.effective_balances.clone(),
best_justified_checkpoint: self.best_justified_checkpoint,
best_justified_checkpoint: JUNK_BEST_JUSTIFIED_CHECKPOINT,
unrealized_justified_checkpoint: self.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint,
proposer_boost_root: self.proposer_boost_root,
@@ -234,7 +240,6 @@ where
finalized_checkpoint: persisted.finalized_checkpoint,
justified_checkpoint: persisted.justified_checkpoint,
justified_balances,
best_justified_checkpoint: persisted.best_justified_checkpoint,
unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint,
proposer_boost_root: persisted.proposer_boost_root,
@@ -277,10 +282,6 @@ where
&self.justified_balances
}
fn best_justified_checkpoint(&self) -> &Checkpoint {
&self.best_justified_checkpoint
}
fn finalized_checkpoint(&self) -> &Checkpoint {
&self.finalized_checkpoint
}
@@ -333,10 +334,6 @@ where
Ok(())
}
fn set_best_justified_checkpoint(&mut self, checkpoint: Checkpoint) {
self.best_justified_checkpoint = checkpoint
}
fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint) {
self.unrealized_justified_checkpoint = checkpoint;
}

View File

@@ -1550,7 +1550,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
current_slot,
indexed_attestation,
AttestationFromBlock::True,
&chain.spec,
) {
Ok(()) => Ok(()),
// Ignore invalid attestations whilst importing attestations from a block. The

View File

@@ -19,7 +19,7 @@ use crate::{
};
use eth1::Config as Eth1Config;
use execution_layer::ExecutionLayer;
use fork_choice::{ForkChoice, ResetPayloadStatuses};
use fork_choice::{CountUnrealized, ForkChoice, ResetPayloadStatuses};
use futures::channel::mpsc::Sender;
use kzg::{Kzg, TrustedSetup};
use operation_pool::{OperationPool, PersistedOperationPool};
@@ -269,7 +269,6 @@ where
ResetPayloadStatuses::always_reset_conditionally(
self.chain_config.always_reset_payload_statuses,
),
self.chain_config.count_unrealized_full,
&self.spec,
log,
)
@@ -388,7 +387,6 @@ where
&genesis.beacon_block,
&genesis.beacon_state,
current_slot,
self.chain_config.count_unrealized_full,
&self.spec,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
@@ -507,7 +505,6 @@ where
&snapshot.beacon_block,
&snapshot.beacon_state,
current_slot,
self.chain_config.count_unrealized_full,
&self.spec,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
@@ -698,8 +695,7 @@ where
store.clone(),
Some(current_slot),
&self.spec,
self.chain_config.count_unrealized.into(),
self.chain_config.count_unrealized_full,
CountUnrealized::True,
)?;
}
@@ -782,6 +778,7 @@ where
let genesis_time = head_snapshot.beacon_state.genesis_time();
let head_for_snapshot_cache = head_snapshot.clone();
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let shuffling_cache_size = self.chain_config.shuffling_cache_size;
let beacon_chain = BeaconChain {
spec: self.spec,
@@ -835,7 +832,7 @@ where
DEFAULT_SNAPSHOT_CACHE_SIZE,
head_for_snapshot_cache,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(shuffling_cache_size)),
eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())),
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),

View File

@@ -45,8 +45,7 @@ use crate::{
};
use eth2::types::{EventKind, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead};
use fork_choice::{
CountUnrealizedFull, ExecutionStatus, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock,
ResetPayloadStatuses,
ExecutionStatus, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock, ResetPayloadStatuses,
};
use itertools::process_results;
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
@@ -285,19 +284,13 @@ impl<T: BeaconChainTypes> CanonicalHead<T> {
// defensive programming.
mut fork_choice_write_lock: RwLockWriteGuard<BeaconForkChoice<T>>,
reset_payload_statuses: ResetPayloadStatuses,
count_unrealized_full: CountUnrealizedFull,
store: &BeaconStore<T>,
spec: &ChainSpec,
log: &Logger,
) -> Result<(), Error> {
let fork_choice = <BeaconChain<T>>::load_fork_choice(
store.clone(),
reset_payload_statuses,
count_unrealized_full,
spec,
log,
)?
.ok_or(Error::MissingPersistedForkChoice)?;
let fork_choice =
<BeaconChain<T>>::load_fork_choice(store.clone(), reset_payload_statuses, spec, log)?
.ok_or(Error::MissingPersistedForkChoice)?;
let fork_choice_view = fork_choice.cached_fork_choice_view();
let beacon_block_root = fork_choice_view.head_block_root;
let beacon_block = store

View File

@@ -1,4 +1,4 @@
pub use proto_array::{CountUnrealizedFull, ReOrgThreshold};
pub use proto_array::ReOrgThreshold;
use serde_derive::{Deserialize, Serialize};
use std::time::Duration;
use types::{Checkpoint, Epoch};
@@ -48,16 +48,11 @@ pub struct ChainConfig {
pub builder_fallback_epochs_since_finalization: usize,
/// Whether any chain health checks should be considered when deciding whether to use the builder API.
pub builder_fallback_disable_checks: bool,
/// When set to `true`, weigh the "unrealized" FFG progression when choosing a head in fork
/// choice.
pub count_unrealized: bool,
/// When set to `true`, forget any valid/invalid/optimistic statuses in fork choice during start
/// up.
pub always_reset_payload_statuses: bool,
/// Whether to apply paranoid checks to blocks proposed by this beacon node.
pub paranoid_block_proposal: bool,
/// Whether to strictly count unrealized justified votes.
pub count_unrealized_full: CountUnrealizedFull,
/// Optionally set timeout for calls to checkpoint sync endpoint.
pub checkpoint_sync_url_timeout: u64,
/// The offset before the start of a proposal slot at which payload attributes should be sent.
@@ -67,6 +62,8 @@ pub struct ChainConfig {
pub prepare_payload_lookahead: Duration,
/// Use EL-free optimistic sync for the finalized part of the chain.
pub optimistic_finalized_sync: bool,
/// The size of the shuffling cache,
pub shuffling_cache_size: usize,
/// Whether to send payload attributes every slot, regardless of connected proposers.
///
/// This is useful for block builders and testing.
@@ -89,14 +86,13 @@ impl Default for ChainConfig {
builder_fallback_skips_per_epoch: 8,
builder_fallback_epochs_since_finalization: 3,
builder_fallback_disable_checks: false,
count_unrealized: true,
always_reset_payload_statuses: false,
paranoid_block_proposal: false,
count_unrealized_full: CountUnrealizedFull::default(),
checkpoint_sync_url_timeout: 60,
prepare_payload_lookahead: Duration::from_secs(4),
// This value isn't actually read except in tests.
optimistic_finalized_sync: true,
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
always_prepare_payload: false,
}
}

View File

@@ -1,7 +1,6 @@
use crate::{BeaconForkChoiceStore, BeaconSnapshot};
use fork_choice::{CountUnrealized, ForkChoice, PayloadVerificationStatus};
use itertools::process_results;
use proto_array::CountUnrealizedFull;
use slog::{info, warn, Logger};
use state_processing::state_advance::complete_state_advance;
use state_processing::{
@@ -102,7 +101,6 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
current_slot: Option<Slot>,
spec: &ChainSpec,
count_unrealized_config: CountUnrealized,
count_unrealized_full_config: CountUnrealizedFull,
) -> Result<ForkChoice<BeaconForkChoiceStore<E, Hot, Cold>, E>, String> {
// Fetch finalized block.
let finalized_checkpoint = head_state.finalized_checkpoint();
@@ -156,7 +154,6 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
&finalized_snapshot.beacon_block,
&finalized_snapshot.beacon_state,
current_slot,
count_unrealized_full_config,
spec,
)
.map_err(|e| format!("Unable to reset fork choice for revert: {:?}", e))?;

View File

@@ -43,7 +43,7 @@ mod persisted_fork_choice;
mod pre_finalization_cache;
pub mod proposer_prep_service;
pub mod schema_change;
mod shuffling_cache;
pub mod shuffling_cache;
mod snapshot_cache;
pub mod state_advance_timer;
pub mod sync_committee_rewards;
@@ -60,7 +60,7 @@ pub use self::beacon_chain::{
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::chain_config::{ChainConfig, CountUnrealizedFull};
pub use self::chain_config::ChainConfig;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use self::historical_blocks::HistoricalBlockError;
pub use attestation_verification::Error as AttestationError;

View File

@@ -3,6 +3,7 @@ mod migration_schema_v12;
mod migration_schema_v13;
mod migration_schema_v14;
mod migration_schema_v15;
mod migration_schema_v16;
use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY};
use crate::eth1_chain::SszEth1;
@@ -132,6 +133,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v15::downgrade_from_v15::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(15), SchemaVersion(16)) => {
let ops = migration_schema_v16::upgrade_to_v16::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(16), SchemaVersion(15)) => {
let ops = migration_schema_v16::downgrade_from_v16::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,

View File

@@ -0,0 +1,46 @@
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY};
use crate::persisted_fork_choice::PersistedForkChoiceV11;
use slog::{debug, Logger};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
pub fn upgrade_to_v16<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
drop_balances_cache::<T>(db, log)
}
pub fn downgrade_from_v16<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
drop_balances_cache::<T>(db, log)
}
/// Drop the balances cache from the fork choice store.
///
/// There aren't any type-level changes in this schema migration, however the
/// way that we compute the `JustifiedBalances` has changed due to:
/// https://github.com/sigp/lighthouse/pull/3962
pub fn drop_balances_cache<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut persisted_fork_choice = db
.get_item::<PersistedForkChoiceV11>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
debug!(
log,
"Dropping fork choice balances cache";
"item_count" => persisted_fork_choice.fork_choice_store.balances_cache.items.len()
);
// Drop all items in the balances cache.
persisted_fork_choice.fork_choice_store.balances_cache = <_>::default();
let kv_op = persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY);
Ok(vec![kv_op])
}

View File

@@ -9,7 +9,7 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256
/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash +
/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this
/// ignores a few extra bytes in the caches that should be insignificant compared to the indices).
const CACHE_SIZE: usize = 16;
pub const DEFAULT_CACHE_SIZE: usize = 16;
/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this
/// limits the number of concurrent states that can be loaded into memory for the committee cache.
@@ -54,9 +54,9 @@ pub struct ShufflingCache {
}
impl ShufflingCache {
pub fn new() -> Self {
pub fn new(cache_size: usize) -> Self {
Self {
cache: LruCache::new(CACHE_SIZE),
cache: LruCache::new(cache_size),
}
}
@@ -172,7 +172,7 @@ impl ToArcCommitteeCache for Arc<CommitteeCache> {
impl Default for ShufflingCache {
fn default() -> Self {
Self::new()
Self::new(DEFAULT_CACHE_SIZE)
}
}
@@ -249,7 +249,7 @@ mod test {
fn resolved_promise() {
let (committee_a, _) = committee_caches();
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
@@ -276,7 +276,7 @@ mod test {
#[test]
fn unresolved_promise() {
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
@@ -301,7 +301,7 @@ mod test {
fn two_promises() {
let (committee_a, committee_b) = committee_caches();
let (id_a, id_b) = (shuffling_id(1), shuffling_id(2));
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
// Create promise A.
let sender_a = cache.create_promise(id_a.clone()).unwrap();
@@ -355,7 +355,7 @@ mod test {
#[test]
fn too_many_promises() {
let mut cache = ShufflingCache::new();
let mut cache = ShufflingCache::default();
for i in 0..MAX_CONCURRENT_PROMISES {
cache.create_promise(shuffling_id(i as u64)).unwrap();

View File

@@ -500,7 +500,7 @@ async fn unaggregated_attestations_added_to_fork_choice_some_none() {
// Move forward a slot so all queued attestations can be processed.
harness.advance_slot();
fork_choice
.update_time(harness.chain.slot().unwrap(), &harness.chain.spec)
.update_time(harness.chain.slot().unwrap())
.unwrap();
let validator_slots: Vec<(usize, Slot)> = (0..VALIDATOR_COUNT)
@@ -614,7 +614,7 @@ async fn unaggregated_attestations_added_to_fork_choice_all_updated() {
// Move forward a slot so all queued attestations can be processed.
harness.advance_slot();
fork_choice
.update_time(harness.chain.slot().unwrap(), &harness.chain.spec)
.update_time(harness.chain.slot().unwrap())
.unwrap();
let validators: Vec<usize> = (0..VALIDATOR_COUNT).collect();

View File

@@ -54,8 +54,8 @@ use system_health::observe_system_health_bn;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload,
CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
@@ -784,39 +784,112 @@ pub fn serve<T: BeaconChainTypes>(
let current_epoch = state.current_epoch();
let epoch = query.epoch.unwrap_or(current_epoch);
let committee_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state
.committee_cache_is_initialized(relative_epoch) =>
{
state.committee_cache(relative_epoch).map(Cow::Borrowed)
}
_ => CommitteeCache::initialized(state, epoch, &chain.spec)
// Attempt to obtain the committee_cache from the beacon chain
let decision_slot = (epoch.saturating_sub(2u64))
.end_slot(T::EthSpec::slots_per_epoch());
// Find the decision block and skip to another method on any kind
// of failure
let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
{
Some(AttestationShufflingId {
shuffling_epoch: epoch,
shuffling_decision_block,
})
} else {
None
};
// Attempt to read from the chain cache if there exists a
// shuffling_id
let maybe_cached_shuffling = if let Some(shuffling_id) =
shuffling_id.as_ref()
{
chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
} else {
None
};
let committee_cache = if let Some(ref shuffling) =
maybe_cached_shuffling
{
Cow::Borrowed(&**shuffling)
} else {
let possibly_built_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state.committee_cache_is_initialized(
relative_epoch,
) =>
{
state
.committee_cache(relative_epoch)
.map(Cow::Borrowed)
}
_ => CommitteeCache::initialized(
state,
epoch,
&chain.spec,
)
.map(Cow::Owned),
}
.map_err(|e| match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root() as u64;
let first_subsequent_restore_point_slot = ((epoch
.start_slot(T::EthSpec::slots_per_epoch())
/ max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(format!(
"epoch out of bounds, try state at slot {}",
first_subsequent_restore_point_slot,
))
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, too far in future".into(),
)
}
.map_err(|e| {
match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root()
as u64;
let first_subsequent_restore_point_slot =
((epoch.start_slot(
T::EthSpec::slots_per_epoch(),
) / max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(
format!(
"epoch out of bounds, \
try state at slot {}",
first_subsequent_restore_point_slot,
),
)
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, \
too far in future"
.into(),
)
}
}
_ => {
warp_utils::reject::beacon_chain_error(e.into())
}
}
})?;
// Attempt to write to the beacon cache (only if the cache
// size is not the default value).
if chain.config.shuffling_cache_size
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
{
if let Some(shuffling_id) = shuffling_id {
if let Some(mut cache_write) = chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
{
cache_write.insert_committee_cache(
shuffling_id,
&*possibly_built_cache,
);
}
}
_ => warp_utils::reject::beacon_chain_error(e.into()),
})?;
}
possibly_built_cache
};
// Use either the supplied slot or all slots in the epoch.
let slots =

View File

@@ -5,7 +5,7 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2021"
[dependencies]
discv5 = { version = "0.1.0", features = ["libp2p"] }
discv5 = { version = "0.2.2", features = ["libp2p"] }
unsigned-varint = { version = "0.6.0", features = ["codec"] }
types = { path = "../../consensus/types" }
eth2_ssz_types = "0.2.2"

View File

@@ -177,6 +177,13 @@ pub struct Discovery<TSpec: EthSpec> {
/// always false.
pub started: bool,
/// This keeps track of whether an external UDP port change should also indicate an internal
/// TCP port change. As we cannot detect our external TCP port, we assume that the external UDP
/// port is also our external TCP port. This assumption only holds if the user has not
/// explicitly set their ENR TCP port via the CLI config. The first indicates tcp4 and the
/// second indicates tcp6.
update_tcp_port: (bool, bool),
/// Logger for the discovery behaviour.
log: slog::Logger,
}
@@ -197,6 +204,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
};
let local_enr = network_globals.local_enr.read().clone();
let local_node_id = local_enr.node_id();
info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> %local_enr.node_id(),
"ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp6()
@@ -217,6 +225,10 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Add bootnodes to routing table
for bootnode_enr in config.boot_nodes_enr.clone() {
if bootnode_enr.node_id() == local_node_id {
// If we are a boot node, ignore adding it to the routing table
continue;
}
debug!(
log,
"Adding node to routing table";
@@ -295,6 +307,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
}
let update_tcp_port = (
config.enr_tcp4_port.is_none(),
config.enr_tcp6_port.is_none(),
);
Ok(Self {
cached_enrs: LruCache::new(50),
network_globals,
@@ -304,6 +321,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
discv5,
event_stream,
started: !config.disable_discovery,
update_tcp_port,
log,
enr_dir,
})
@@ -1014,6 +1032,13 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
metrics::check_nat();
// Discv5 will have updated our local ENR. We save the updated version
// to disk.
if (self.update_tcp_port.0 && socket_addr.is_ipv4())
|| (self.update_tcp_port.1 && socket_addr.is_ipv6())
{
// Update the TCP port in the ENR
self.discv5.update_local_enr_socket(socket_addr, true);
}
let enr = self.discv5.local_enr();
enr::save_enr_to_disk(Path::new(&self.enr_dir), &enr, &self.log);
// update network globals

View File

@@ -167,7 +167,8 @@ pub fn check_nat() {
}
pub fn scrape_discovery_metrics() {
let metrics = discv5::metrics::Metrics::from(discv5::Discv5::raw_metrics());
let metrics =
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());
set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second);
set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64);
set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64);

View File

@@ -572,6 +572,9 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
}) => {
// Unqueue the attestations we have for this root, if any.
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
let mut sent_count = 0;
let mut failed_to_send_count = 0;
for id in queued_ids {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS,
@@ -596,10 +599,9 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
// Send the work.
if self.ready_work_tx.try_send(work).is_err() {
error!(
log,
"Failed to send scheduled attestation";
);
failed_to_send_count += 1;
} else {
sent_count += 1;
}
} else {
// There is a mismatch between the attestation ids registered for this
@@ -612,6 +614,18 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
);
}
}
if failed_to_send_count > 0 {
error!(
log,
"Ignored scheduled attestation(s) for block";
"hint" => "system may be overloaded",
"parent_root" => ?parent_root,
"block_root" => ?block_root,
"failed_count" => failed_to_send_count,
"sent_count" => sent_count,
);
}
}
// Unqueue the light client optimistic updates we have for this root, if any.
if let Some(queued_lc_id) = self
@@ -726,7 +740,9 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
if self.ready_work_tx.try_send(work).is_err() {
error!(
log,
"Failed to send scheduled attestation";
"Ignored scheduled attestation";
"hint" => "system may be overloaded",
"beacon_block_root" => ?root
);
}

View File

@@ -377,6 +377,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
address of this server (e.g., http://localhost:5054).")
.takes_value(true),
)
.arg(
Arg::with_name("shuffling-cache-size")
.long("shuffling-cache-size")
.help("Some HTTP API requests can be optimised by caching the shufflings at each epoch. \
This flag allows the user to set the shuffling cache size in epochs. \
Shufflings are dependent on validator count and setting this value to a large number can consume a large amount of memory.")
.takes_value(true)
)
/*
* Monitoring metrics
@@ -1000,8 +1008,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
Arg::with_name("count-unrealized")
.long("count-unrealized")
.hidden(true)
.help("Enables an alternative, potentially more performant FFG \
vote tracking method.")
.help("This flag is deprecated and has no effect.")
.takes_value(true)
.default_value("true")
)
@@ -1009,7 +1016,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
Arg::with_name("count-unrealized-full")
.long("count-unrealized-full")
.hidden(true)
.help("Stricter version of `count-unrealized`.")
.help("This flag is deprecated and has no effect.")
.takes_value(true)
.default_value("false")
)

View File

@@ -149,6 +149,10 @@ pub fn get_config<E: EthSpec>(
client_config.http_api.allow_sync_stalled = true;
}
if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
client_config.chain.shuffling_cache_size = cache_size;
}
/*
* Prometheus metrics HTTP server
*/
@@ -742,10 +746,21 @@ pub fn get_config<E: EthSpec>(
client_config.chain.fork_choice_before_proposal_timeout_ms = timeout;
}
client_config.chain.count_unrealized =
clap_utils::parse_required(cli_args, "count-unrealized")?;
client_config.chain.count_unrealized_full =
clap_utils::parse_required::<bool>(cli_args, "count-unrealized-full")?.into();
if !clap_utils::parse_required::<bool>(cli_args, "count-unrealized")? {
warn!(
log,
"The flag --count-unrealized is deprecated and will be removed";
"info" => "any use of the flag will have no effect"
);
}
if clap_utils::parse_required::<bool>(cli_args, "count-unrealized-full")? {
warn!(
log,
"The flag --count-unrealized-full is deprecated and will be removed";
"info" => "setting it to `true` has no effect"
);
}
client_config.chain.always_reset_payload_statuses =
cli_args.is_present("reset-payload-statuses");

View File

@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{Checkpoint, Hash256, Slot};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15);
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(16);
// All the keys that get stored under the `BeaconMeta` column.
//