Merge branch 'capella' of https://github.com/sigp/lighthouse into eip4844

# Conflicts:
#	beacon_node/beacon_chain/src/beacon_chain.rs
#	beacon_node/beacon_chain/src/block_verification.rs
#	beacon_node/beacon_chain/src/test_utils.rs
#	beacon_node/execution_layer/src/engine_api.rs
#	beacon_node/execution_layer/src/engine_api/http.rs
#	beacon_node/execution_layer/src/lib.rs
#	beacon_node/execution_layer/src/test_utils/handle_rpc.rs
#	beacon_node/http_api/src/lib.rs
#	beacon_node/http_api/tests/fork_tests.rs
#	beacon_node/network/src/beacon_processor/mod.rs
#	beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs
#	beacon_node/network/src/beacon_processor/worker/sync_methods.rs
#	beacon_node/operation_pool/src/bls_to_execution_changes.rs
#	beacon_node/operation_pool/src/lib.rs
#	beacon_node/operation_pool/src/persistence.rs
#	consensus/serde_utils/src/u256_hex_be_opt.rs
#	testing/antithesis/Dockerfile.libvoidstar
This commit is contained in:
realbigsean
2023-02-07 12:12:56 -05:00
53 changed files with 1680 additions and 560 deletions

View File

@@ -0,0 +1,322 @@
use crate::*;
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use slog::{debug, info, warn, Logger};
use slot_clock::SlotClock;
use std::cmp;
use std::collections::HashSet;
use std::mem;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::sleep;
use types::EthSpec;
/// The size of each chunk of addresses changes to be broadcast at the Capella
/// fork.
const BROADCAST_CHUNK_SIZE: usize = 128;
/// The delay between broadcasting each chunk.
const BROADCAST_CHUNK_DELAY: Duration = Duration::from_millis(500);
/// If the Capella fork has already been reached, `broadcast_address_changes` is
/// called immediately.
///
/// If the Capella fork has not been reached, waits until the start of the fork
/// epoch and then calls `broadcast_address_changes`.
pub async fn broadcast_address_changes_at_capella<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
network_send: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: &Logger,
) {
let spec = &chain.spec;
let slot_clock = &chain.slot_clock;
let capella_fork_slot = if let Some(epoch) = spec.capella_fork_epoch {
epoch.start_slot(T::EthSpec::slots_per_epoch())
} else {
// Exit now if Capella is not defined.
return;
};
// Wait until the Capella fork epoch.
while chain.slot().map_or(true, |slot| slot < capella_fork_slot) {
match slot_clock.duration_to_slot(capella_fork_slot) {
Some(duration) => {
// Sleep until the Capella fork.
sleep(duration).await;
break;
}
None => {
// We were unable to read the slot clock wait another slot
// and then try again.
sleep(slot_clock.slot_duration()).await;
}
}
}
// The following function will be called in two scenarios:
//
// 1. The node has been running for some time and the Capella fork has just
// been reached.
// 2. The node has just started and it is *after* the Capella fork.
broadcast_address_changes(chain, network_send, log).await
}
/// Broadcasts any address changes that are flagged for broadcasting at the
/// Capella fork epoch.
///
/// Address changes are published in chunks, with a delay between each chunk.
/// This helps reduce the load on the P2P network and also helps prevent us from
/// clogging our `network_send` channel and being late to publish
/// blocks, attestations, etc.
pub async fn broadcast_address_changes<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
network_send: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: &Logger,
) {
let head = chain.head_snapshot();
let mut changes = chain
.op_pool
.get_bls_to_execution_changes_received_pre_capella(&head.beacon_state, &chain.spec);
while !changes.is_empty() {
// This `split_off` approach is to allow us to have owned chunks of the
// `changes` vec. The `std::slice::Chunks` method uses references and
// the `itertools` iterator that achives this isn't `Send` so it doesn't
// work well with the `sleep` at the end of the loop.
let tail = changes.split_off(cmp::min(BROADCAST_CHUNK_SIZE, changes.len()));
let chunk = mem::replace(&mut changes, tail);
let mut published_indices = HashSet::with_capacity(BROADCAST_CHUNK_SIZE);
let mut num_ok = 0;
let mut num_err = 0;
// Publish each individual address change.
for address_change in chunk {
let validator_index = address_change.message.validator_index;
let pubsub_message = PubsubMessage::BlsToExecutionChange(Box::new(address_change));
let message = NetworkMessage::Publish {
messages: vec![pubsub_message],
};
// It seems highly unlikely that this unbounded send will fail, but
// we handle the result nontheless.
if let Err(e) = network_send.send(message) {
debug!(
log,
"Failed to publish change message";
"error" => ?e,
"validator_index" => validator_index
);
num_err += 1;
} else {
debug!(
log,
"Published address change message";
"validator_index" => validator_index
);
num_ok += 1;
published_indices.insert(validator_index);
}
}
// Remove any published indices from the list of indices that need to be
// published.
chain
.op_pool
.register_indices_broadcasted_at_capella(&published_indices);
info!(
log,
"Published address change messages";
"num_published" => num_ok,
);
if num_err > 0 {
warn!(
log,
"Failed to publish address changes";
"info" => "failed messages will be retried",
"num_unable_to_publish" => num_err,
);
}
sleep(BROADCAST_CHUNK_DELAY).await;
}
debug!(
log,
"Address change routine complete";
);
}
#[cfg(not(debug_assertions))] // Tests run too slow in debug.
#[cfg(test)]
mod tests {
use super::*;
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use operation_pool::ReceivedPreCapella;
use state_processing::{SigVerifiedOp, VerifyOperation};
use std::collections::HashSet;
use tokio::sync::mpsc;
use types::*;
type E = MainnetEthSpec;
pub const VALIDATOR_COUNT: usize = BROADCAST_CHUNK_SIZE * 3;
pub const EXECUTION_ADDRESS: Address = Address::repeat_byte(42);
struct Tester {
harness: BeaconChainHarness<EphemeralHarnessType<E>>,
/// Changes which should be broadcast at the Capella fork.
received_pre_capella_changes: Vec<SigVerifiedOp<SignedBlsToExecutionChange, E>>,
/// Changes which should *not* be broadcast at the Capella fork.
not_received_pre_capella_changes: Vec<SigVerifiedOp<SignedBlsToExecutionChange, E>>,
}
impl Tester {
fn new() -> Self {
let altair_fork_epoch = Epoch::new(0);
let bellatrix_fork_epoch = Epoch::new(0);
let capella_fork_epoch = Epoch::new(2);
let mut spec = E::default_spec();
spec.altair_fork_epoch = Some(altair_fork_epoch);
spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch);
spec.capella_fork_epoch = Some(capella_fork_epoch);
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.logger(logging::test_logger())
.deterministic_keypairs(VALIDATOR_COUNT)
.deterministic_withdrawal_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();
Self {
harness,
received_pre_capella_changes: <_>::default(),
not_received_pre_capella_changes: <_>::default(),
}
}
fn produce_verified_address_change(
&self,
validator_index: u64,
) -> SigVerifiedOp<SignedBlsToExecutionChange, E> {
let change = self
.harness
.make_bls_to_execution_change(validator_index, EXECUTION_ADDRESS);
let head = self.harness.chain.head_snapshot();
change
.validate(&head.beacon_state, &self.harness.spec)
.unwrap()
}
fn produce_received_pre_capella_changes(mut self, indices: Vec<u64>) -> Self {
for validator_index in indices {
self.received_pre_capella_changes
.push(self.produce_verified_address_change(validator_index));
}
self
}
fn produce_not_received_pre_capella_changes(mut self, indices: Vec<u64>) -> Self {
for validator_index in indices {
self.not_received_pre_capella_changes
.push(self.produce_verified_address_change(validator_index));
}
self
}
async fn run(self) {
let harness = self.harness;
let chain = harness.chain.clone();
let mut broadcast_indices = HashSet::new();
for change in self.received_pre_capella_changes {
broadcast_indices.insert(change.as_inner().message.validator_index);
chain
.op_pool
.insert_bls_to_execution_change(change, ReceivedPreCapella::Yes);
}
let mut non_broadcast_indices = HashSet::new();
for change in self.not_received_pre_capella_changes {
non_broadcast_indices.insert(change.as_inner().message.validator_index);
chain
.op_pool
.insert_bls_to_execution_change(change, ReceivedPreCapella::No);
}
harness.set_current_slot(
chain
.spec
.capella_fork_epoch
.unwrap()
.start_slot(E::slots_per_epoch()),
);
let (sender, mut receiver) = mpsc::unbounded_channel();
broadcast_address_changes_at_capella(&chain, sender, &logging::test_logger()).await;
let mut broadcasted_changes = vec![];
while let Some(NetworkMessage::Publish { mut messages }) = receiver.recv().await {
match messages.pop().unwrap() {
PubsubMessage::BlsToExecutionChange(change) => broadcasted_changes.push(change),
_ => panic!("unexpected message"),
}
}
assert_eq!(
broadcasted_changes.len(),
broadcast_indices.len(),
"all expected changes should have been broadcast"
);
for broadcasted in &broadcasted_changes {
assert!(
!non_broadcast_indices.contains(&broadcasted.message.validator_index),
"messages not flagged for broadcast should not have been broadcast"
);
}
let head = chain.head_snapshot();
assert!(
chain
.op_pool
.get_bls_to_execution_changes_received_pre_capella(
&head.beacon_state,
&chain.spec,
)
.is_empty(),
"there shouldn't be any capella broadcast changes left in the op pool"
);
}
}
// Useful for generating even-numbered indices. Required since only even
// numbered genesis validators have BLS credentials.
fn even_indices(start: u64, count: usize) -> Vec<u64> {
(start..).filter(|i| i % 2 == 0).take(count).collect()
}
#[tokio::test]
async fn one_chunk() {
Tester::new()
.produce_received_pre_capella_changes(even_indices(0, 4))
.produce_not_received_pre_capella_changes(even_indices(10, 4))
.run()
.await;
}
#[tokio::test]
async fn multiple_chunks() {
Tester::new()
.produce_received_pre_capella_changes(even_indices(0, BROADCAST_CHUNK_SIZE * 3 / 2))
.run()
.await;
}
}

View File

@@ -1,3 +1,4 @@
use crate::address_change_broadcast::broadcast_address_changes_at_capella;
use crate::config::{ClientGenesis, Config as ClientConfig};
use crate::notifier::spawn_notifier;
use crate::Client;
@@ -154,7 +155,6 @@ where
config,
context.executor.clone(),
context.log().clone(),
&spec,
)
.map_err(|e| format!("unable to start execution layer endpoints: {:?}", e))?;
Some(execution_layer)
@@ -809,6 +809,25 @@ where
// Spawns a routine that polls the `exchange_transition_configuration` endpoint.
execution_layer.spawn_transition_configuration_poll(beacon_chain.spec.clone());
}
// Spawn a service to publish BLS to execution changes at the Capella fork.
if let Some(network_senders) = self.network_senders {
let inner_chain = beacon_chain.clone();
let broadcast_context =
runtime_context.service_context("addr_bcast".to_string());
let log = broadcast_context.log().clone();
broadcast_context.executor.spawn(
async move {
broadcast_address_changes_at_capella(
&inner_chain,
network_senders.network_send(),
&log,
)
.await
},
"addr_broadcast",
);
}
}
start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone());

View File

@@ -1,5 +1,6 @@
extern crate slog;
mod address_change_broadcast;
pub mod config;
mod metrics;
mod notifier;

View File

@@ -1,5 +1,6 @@
use crate::metrics;
use beacon_chain::{
capella_readiness::CapellaReadiness,
merge_readiness::{MergeConfig, MergeReadiness},
BeaconChain, BeaconChainTypes, ExecutionStatus,
};
@@ -313,6 +314,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
eth1_logging(&beacon_chain, &log);
merge_readiness_logging(current_slot, &beacon_chain, &log).await;
capella_readiness_logging(current_slot, &beacon_chain, &log).await;
}
};
@@ -350,12 +352,15 @@ async fn merge_readiness_logging<T: BeaconChainTypes>(
}
if merge_completed && !has_execution_layer {
error!(
log,
"Execution endpoint required";
"info" => "you need an execution engine to validate blocks, see: \
https://lighthouse-book.sigmaprime.io/merge-migration.html"
);
if !beacon_chain.is_time_to_prepare_for_capella(current_slot) {
// logging of the EE being offline is handled in `capella_readiness_logging()`
error!(
log,
"Execution endpoint required";
"info" => "you need an execution engine to validate blocks, see: \
https://lighthouse-book.sigmaprime.io/merge-migration.html"
);
}
return;
}
@@ -419,6 +424,60 @@ async fn merge_readiness_logging<T: BeaconChainTypes>(
}
}
/// Provides some helpful logging to users to indicate if their node is ready for Capella
async fn capella_readiness_logging<T: BeaconChainTypes>(
current_slot: Slot,
beacon_chain: &BeaconChain<T>,
log: &Logger,
) {
let capella_completed = beacon_chain
.canonical_head
.cached_head()
.snapshot
.beacon_block
.message()
.body()
.execution_payload()
.map_or(false, |payload| payload.withdrawals_root().is_ok());
let has_execution_layer = beacon_chain.execution_layer.is_some();
if capella_completed && has_execution_layer
|| !beacon_chain.is_time_to_prepare_for_capella(current_slot)
{
return;
}
if capella_completed && !has_execution_layer {
error!(
log,
"Execution endpoint required";
"info" => "you need a Capella enabled execution engine to validate blocks, see: \
https://lighthouse-book.sigmaprime.io/merge-migration.html"
);
return;
}
match beacon_chain.check_capella_readiness().await {
CapellaReadiness::Ready => {
info!(log, "Ready for Capella")
}
readiness @ CapellaReadiness::ExchangeCapabilitiesFailed { error: _ } => {
error!(
log,
"Not ready for Capella";
"info" => %readiness,
"hint" => "try updating Lighthouse and/or the execution layer",
)
}
readiness => warn!(
log,
"Not ready for Capella";
"info" => %readiness,
),
}
}
fn eth1_logging<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>, log: &Logger) {
let current_slot_opt = beacon_chain.slot().ok();