Merge branch 'unstable' of https://github.com/sigp/lighthouse into ef-tests-electra

This commit is contained in:
realbigsean
2024-07-16 09:38:29 -07:00
25 changed files with 356 additions and 498 deletions

View File

@@ -3088,14 +3088,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
notify_execution_layer,
)?;
publish_fn()?;
// Record the time it took to complete consensus verification.
if let Some(timestamp) = self.slot_clock.now_duration() {
self.block_times_cache
.write()
.set_time_consensus_verified(block_root, block_slot, timestamp)
}
let executed_block = chain.into_executed_block(execution_pending).await?;
// Record the time it took to ask the execution layer.
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
self.block_times_cache.write().set_execution_time(
block_root,
block_slot,
seen_timestamp,
)
// Record the *additional* time it took to wait for execution layer verification.
if let Some(timestamp) = self.slot_clock.now_duration() {
self.block_times_cache
.write()
.set_time_executed(block_root, block_slot, timestamp)
}
match executed_block {

View File

@@ -19,7 +19,9 @@ type BlockRoot = Hash256;
pub struct Timestamps {
pub observed: Option<Duration>,
pub all_blobs_observed: Option<Duration>,
pub execution_time: Option<Duration>,
pub consensus_verified: Option<Duration>,
pub started_execution: Option<Duration>,
pub executed: Option<Duration>,
pub attestable: Option<Duration>,
pub imported: Option<Duration>,
pub set_as_head: Option<Duration>,
@@ -32,7 +34,9 @@ pub struct BlockDelays {
pub observed: Option<Duration>,
/// The time after the start of the slot we saw all blobs.
pub all_blobs_observed: Option<Duration>,
/// The time it took to get verification from the EL for the block.
/// The time it took to complete consensus verification of the block.
pub consensus_verification_time: Option<Duration>,
/// The time it took to complete execution verification of the block.
pub execution_time: Option<Duration>,
/// The delay from the start of the slot before the block became available
///
@@ -58,13 +62,16 @@ impl BlockDelays {
let all_blobs_observed = times
.all_blobs_observed
.and_then(|all_blobs_observed| all_blobs_observed.checked_sub(slot_start_time));
let consensus_verification_time = times
.consensus_verified
.and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?));
let execution_time = times
.execution_time
.and_then(|execution_time| execution_time.checked_sub(times.observed?));
.executed
.and_then(|executed| executed.checked_sub(times.started_execution?));
// Duration since UNIX epoch at which block became available.
let available_time = times.execution_time.map(|execution_time| {
std::cmp::max(execution_time, times.all_blobs_observed.unwrap_or_default())
});
let available_time = times
.executed
.map(|executed| std::cmp::max(executed, times.all_blobs_observed.unwrap_or_default()));
// Duration from the start of the slot until the block became available.
let available_delay =
available_time.and_then(|available_time| available_time.checked_sub(slot_start_time));
@@ -80,6 +87,7 @@ impl BlockDelays {
BlockDelays {
observed,
all_blobs_observed,
consensus_verification_time,
execution_time,
available: available_delay,
attestable,
@@ -155,6 +163,9 @@ impl BlockTimesCache {
slot: Slot,
timestamp: Duration,
) {
// Unlike other functions in this file, we update the blob observed time only if it is
// *greater* than existing blob observation times. This allows us to know the observation
// time of the last blob to arrive.
let block_times = self
.cache
.entry(block_root)
@@ -168,48 +179,89 @@ impl BlockTimesCache {
}
}
pub fn set_execution_time(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
/// Set the timestamp for `field` if that timestamp is less than any previously known value.
///
/// If no previous value is known for the field, then the supplied timestamp will always be
/// stored.
pub fn set_time_if_less(
&mut self,
block_root: BlockRoot,
slot: Slot,
field: impl Fn(&mut Timestamps) -> &mut Option<Duration>,
timestamp: Duration,
) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.execution_time
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.execution_time = Some(timestamp);
let existing_timestamp = field(&mut block_times.timestamps);
if existing_timestamp.map_or(true, |prev| timestamp < prev) {
*existing_timestamp = Some(timestamp);
}
}
pub fn set_time_consensus_verified(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.consensus_verified,
timestamp,
)
}
pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.executed,
timestamp,
)
}
pub fn set_time_started_execution(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.started_execution,
timestamp,
)
}
pub fn set_time_attestable(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.attestable
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.attestable = Some(timestamp);
}
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.attestable,
timestamp,
)
}
pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
block_times.timestamps.imported = Some(timestamp);
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.imported,
timestamp,
)
}
pub fn set_time_set_as_head(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
block_times.timestamps.set_as_head = Some(timestamp);
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.set_as_head,
timestamp,
)
}
pub fn get_block_delays(

View File

@@ -67,7 +67,7 @@ use crate::{
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
use eth2::types::{EventKind, PublishBlockRequest};
use eth2::types::{BlockGossip, EventKind, PublishBlockRequest};
use execution_layer::PayloadStatus;
pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard;
@@ -974,6 +974,16 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// Validate the block's execution_payload (if any).
validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?;
// Beacon API block_gossip events
if let Some(event_handler) = chain.event_handler.as_ref() {
if event_handler.has_block_gossip_subscribers() {
event_handler.register(EventKind::BlockGossip(Box::new(BlockGossip {
slot: block.slot(),
block: block_root,
})));
}
}
// Having checked the proposer index and the block root we can cache them.
let consensus_context = ConsensusContext::new(block.slot())
.set_current_block_root(block_root)
@@ -1334,6 +1344,13 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
// The specification declares that this should be run *inside* `per_block_processing`,
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
// servers).
if let Some(started_execution) = chain.slot_clock.now_duration() {
chain.block_times_cache.write().set_time_started_execution(
block_root,
block.slot(),
started_execution,
);
}
let payload_verification_status = payload_notifier.notify_new_payload().await?;
// If the payload did not validate or invalidate the block, check to see if this block is

View File

@@ -1385,6 +1385,15 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
.as_millis() as i64,
);
// The time it took to check the validity within Lighthouse
metrics::set_gauge(
&metrics::BEACON_BLOCK_DELAY_CONSENSUS_VERIFICATION_TIME,
block_delays
.consensus_verification_time
.unwrap_or_else(|| Duration::from_secs(0))
.as_millis() as i64,
);
// The time it took to check the validity with the EL
metrics::set_gauge(
&metrics::BEACON_BLOCK_DELAY_EXECUTION_TIME,
@@ -1447,6 +1456,7 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
"total_delay_ms" => block_delay_total.as_millis(),
"observed_delay_ms" => format_delay(&block_delays.observed),
"blob_delay_ms" => format_delay(&block_delays.all_blobs_observed),
"consensus_time_ms" => format_delay(&block_delays.consensus_verification_time),
"execution_time_ms" => format_delay(&block_delays.execution_time),
"available_delay_ms" => format_delay(&block_delays.available),
"attestable_delay_ms" => format_delay(&block_delays.attestable),
@@ -1463,6 +1473,7 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
"total_delay_ms" => block_delay_total.as_millis(),
"observed_delay_ms" => format_delay(&block_delays.observed),
"blob_delay_ms" => format_delay(&block_delays.all_blobs_observed),
"consensus_time_ms" => format_delay(&block_delays.consensus_verification_time),
"execution_time_ms" => format_delay(&block_delays.execution_time),
"available_delay_ms" => format_delay(&block_delays.available),
"attestable_delay_ms" => format_delay(&block_delays.attestable),

View File

@@ -23,6 +23,7 @@ pub struct ServerSentEventHandler<E: EthSpec> {
proposer_slashing_tx: Sender<EventKind<E>>,
attester_slashing_tx: Sender<EventKind<E>>,
bls_to_execution_change_tx: Sender<EventKind<E>>,
block_gossip_tx: Sender<EventKind<E>>,
log: Logger,
}
@@ -51,6 +52,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (proposer_slashing_tx, _) = broadcast::channel(capacity);
let (attester_slashing_tx, _) = broadcast::channel(capacity);
let (bls_to_execution_change_tx, _) = broadcast::channel(capacity);
let (block_gossip_tx, _) = broadcast::channel(capacity);
Self {
attestation_tx,
@@ -69,6 +71,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
proposer_slashing_tx,
attester_slashing_tx,
bls_to_execution_change_tx,
block_gossip_tx,
log,
}
}
@@ -147,6 +150,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.bls_to_execution_change_tx
.send(kind)
.map(|count| log_count("bls to execution change", count)),
EventKind::BlockGossip(_) => self
.block_gossip_tx
.send(kind)
.map(|count| log_count("block gossip", count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
@@ -217,6 +224,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.bls_to_execution_change_tx.subscribe()
}
pub fn subscribe_block_gossip(&self) -> Receiver<EventKind<E>> {
self.block_gossip_tx.subscribe()
}
pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
@@ -272,4 +283,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
pub fn has_bls_to_execution_change_subscribers(&self) -> bool {
self.bls_to_execution_change_tx.receiver_count() > 0
}
pub fn has_block_gossip_subscribers(&self) -> bool {
self.block_gossip_tx.receiver_count() > 0
}
}

View File

@@ -857,6 +857,11 @@ lazy_static! {
"Duration between the start of the block's slot and the time the block was observed.",
);
pub static ref BEACON_BLOCK_DELAY_CONSENSUS_VERIFICATION_TIME: Result<IntGauge> = try_create_int_gauge(
"beacon_block_delay_consensus_verification_time",
"The time taken to verify the block within Lighthouse",
);
pub static ref BEACON_BLOCK_DELAY_EXECUTION_TIME: Result<IntGauge> = try_create_int_gauge(
"beacon_block_delay_execution_time",
"The duration in verifying the block with the execution layer.",