Log block import source (#5738)

* the default target peers is 100

* add some comments

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into track-block-import-source

* add block import source

* revert

* update logging text

* fix tests

* lint

* use % instaed of to_string
This commit is contained in:
Eitan Seri-Levi
2024-05-15 12:17:06 +03:00
committed by GitHub
parent 6f45ad4534
commit 6636167503
12 changed files with 74 additions and 15 deletions

View File

@@ -2774,6 +2774,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
signature_verified_block.block_root(),
signature_verified_block,
notify_execution_layer,
BlockImportSource::RangeSync,
|| Ok(()),
)
.await
@@ -2956,6 +2957,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
block_root: Hash256,
unverified_block: B,
block_source: BlockImportSource,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.reqresp_pre_import_cache
@@ -2963,9 +2965,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.insert(block_root, unverified_block.block_cloned());
let r = self
.process_block(block_root, unverified_block, notify_execution_layer, || {
Ok(())
})
.process_block(
block_root,
unverified_block,
notify_execution_layer,
block_source,
|| Ok(()),
)
.await;
self.remove_notified(&block_root, r)
}
@@ -2988,6 +2994,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
block_source: BlockImportSource,
publish_fn: impl FnOnce() -> Result<(), BlockError<T::EthSpec>> + Send + 'static,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Start the Prometheus timer.
@@ -3048,6 +3055,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"Beacon block imported";
"block_root" => ?block_root,
"block_slot" => block_slot,
"source" => %block_source,
);
// Increment the Prometheus counter for block processing successes.

View File

@@ -1881,6 +1881,7 @@ where
block_root,
RpcBlock::new(Some(block_root), block, sidecars).unwrap(),
NotifyExecutionLayer::Yes,
BlockImportSource::RangeSync,
|| Ok(()),
)
.await?
@@ -1907,6 +1908,7 @@ where
block_root,
RpcBlock::new(Some(block_root), block, sidecars).unwrap(),
NotifyExecutionLayer::Yes,
BlockImportSource::RangeSync,
|| Ok(()),
)
.await?

View File

@@ -473,6 +473,7 @@ async fn assert_invalid_signature(
)
.unwrap(),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await;
@@ -541,6 +542,7 @@ async fn invalid_signature_gossip_block() {
signed_block.canonical_root(),
Arc::new(signed_block),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await,
@@ -875,6 +877,7 @@ async fn block_gossip_verification() {
gossip_verified.block_root,
gossip_verified,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -1165,6 +1168,7 @@ async fn verify_block_for_gossip_slashing_detection() {
verified_block.block_root,
verified_block,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -1196,6 +1200,7 @@ async fn verify_block_for_gossip_doppelganger_detection() {
verified_block.block_root,
verified_block,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -1342,6 +1347,7 @@ async fn add_base_block_to_altair_chain() {
base_block.canonical_root(),
Arc::new(base_block.clone()),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -1477,6 +1483,7 @@ async fn add_altair_block_to_base_chain() {
altair_block.canonical_root(),
Arc::new(altair_block.clone()),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await

View File

@@ -702,6 +702,7 @@ async fn invalidates_all_descendants() {
fork_block.canonical_root(),
fork_block,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -802,6 +803,7 @@ async fn switches_heads() {
fork_block.canonical_root(),
fork_block,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -1061,7 +1063,7 @@ async fn invalid_parent() {
// Ensure the block built atop an invalid payload is invalid for import.
assert!(matches!(
rig.harness.chain.process_block(block.canonical_root(), block.clone(), NotifyExecutionLayer::Yes,
rig.harness.chain.process_block(block.canonical_root(), block.clone(), NotifyExecutionLayer::Yes, BlockImportSource::Lookup,
|| Ok(()),
).await,
Err(BlockError::ParentExecutionPayloadInvalid { parent_root: invalid_root })
@@ -1352,6 +1354,7 @@ async fn build_optimistic_chain(
block.canonical_root(),
block,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -1926,6 +1929,7 @@ async fn recover_from_invalid_head_by_importing_blocks() {
fork_block.canonical_root(),
fork_block.clone(),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await

View File

@@ -2458,6 +2458,7 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
full_block.canonical_root(),
RpcBlock::new(Some(block_root), Arc::new(full_block), Some(blobs)).unwrap(),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -2676,6 +2677,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() {
invalid_fork_block.canonical_root(),
invalid_fork_block.clone(),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -2689,6 +2691,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() {
valid_fork_block.canonical_root(),
valid_fork_block.clone(),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await

View File

@@ -12,7 +12,8 @@ use lazy_static::lazy_static;
use operation_pool::PersistedOperationPool;
use state_processing::{per_slot_processing, per_slot_processing::Error as SlotProcessingError};
use types::{
BeaconState, BeaconStateError, EthSpec, Hash256, Keypair, MinimalEthSpec, RelativeEpoch, Slot,
BeaconState, BeaconStateError, BlockImportSource, EthSpec, Hash256, Keypair, MinimalEthSpec,
RelativeEpoch, Slot,
};
// Should ideally be divisible by 3.
@@ -686,6 +687,7 @@ async fn run_skip_slot_test(skip_slots: u64) {
harness_a.chain.head_snapshot().beacon_block_root,
harness_a.get_head_block(),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await

View File

@@ -19,8 +19,8 @@ use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash;
use types::{
AbstractExecPayload, BeaconBlockRef, BlobSidecarList, EthSpec, ExecPayload, ExecutionBlockHash,
ForkName, FullPayload, FullPayloadBellatrix, Hash256, SignedBeaconBlock,
AbstractExecPayload, BeaconBlockRef, BlobSidecarList, BlockImportSource, EthSpec, ExecPayload,
ExecutionBlockHash, ForkName, FullPayload, FullPayloadBellatrix, Hash256, SignedBeaconBlock,
SignedBlindedBeaconBlock, VariableList,
};
use warp::http::StatusCode;
@@ -230,6 +230,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
block_root,
gossip_verified_block,
NotifyExecutionLayer::Yes,
BlockImportSource::HttpApi,
publish_fn,
))
.await

View File

@@ -31,8 +31,8 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, BlobSidecar, EthSpec, Hash256, IndexedAttestation,
LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing,
beacon_block::BlockImportSource, Attestation, AttesterSlashing, BlobSidecar, EthSpec, Hash256,
IndexedAttestation, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage,
SyncSubnetId,
@@ -1141,9 +1141,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;
// TODO(block source)
let result = self
.chain
.process_block_with_early_caching(block_root, verified_block, NotifyExecutionLayer::Yes)
.process_block_with_early_caching(
block_root,
verified_block,
BlockImportSource::Gossip,
NotifyExecutionLayer::Yes,
)
.await;
match &result {

View File

@@ -24,6 +24,7 @@ use store::KzgCommitment;
use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::BlockImportSource;
use types::{Epoch, Hash256};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
@@ -153,7 +154,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let result = self
.chain
.process_block_with_early_caching(block_root, block, NotifyExecutionLayer::Yes)
.process_block_with_early_caching(
block_root,
block,
BlockImportSource::Lookup,
NotifyExecutionLayer::Yes,
)
.await;
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);

View File

@@ -4,6 +4,7 @@ use derivative::Derivative;
use serde::{Deserialize, Serialize};
use ssz::{Decode, DecodeError};
use ssz_derive::{Decode, Encode};
use std::fmt;
use std::marker::PhantomData;
use superstruct::superstruct;
use test_random_derive::TestRandom;
@@ -836,6 +837,23 @@ impl<E: EthSpec, Payload: AbstractExecPayload<E>> ForkVersionDeserialize
))
}
}
pub enum BlockImportSource {
Gossip,
Lookup,
RangeSync,
HttpApi,
}
impl fmt::Display for BlockImportSource {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
BlockImportSource::Gossip => write!(f, "gossip"),
BlockImportSource::Lookup => write!(f, "lookup"),
BlockImportSource::RangeSync => write!(f, "range_sync"),
BlockImportSource::HttpApi => write!(f, "http_api"),
}
}
}
#[cfg(test)]
mod tests {

View File

@@ -122,7 +122,7 @@ pub use crate::attester_slashing::AttesterSlashing;
pub use crate::beacon_block::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockCapella,
BeaconBlockDeneb, BeaconBlockElectra, BeaconBlockRef, BeaconBlockRefMut, BlindedBeaconBlock,
EmptyBlock,
BlockImportSource, EmptyBlock,
};
pub use crate::beacon_block_body::{
BeaconBlockBody, BeaconBlockBodyAltair, BeaconBlockBodyBase, BeaconBlockBodyBellatrix,

View File

@@ -24,9 +24,9 @@ use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use types::{
Attestation, AttesterSlashing, BeaconBlock, BeaconState, BlobSidecar, BlobsList, Checkpoint,
ExecutionBlockHash, Hash256, IndexedAttestation, KzgProof, ProposerPreparationData,
SignedBeaconBlock, Slot, Uint256,
Attestation, AttesterSlashing, BeaconBlock, BeaconState, BlobSidecar, BlobsList,
BlockImportSource, Checkpoint, ExecutionBlockHash, Hash256, IndexedAttestation, KzgProof,
ProposerPreparationData, SignedBeaconBlock, Slot, Uint256,
};
#[derive(Default, Debug, PartialEq, Clone, Deserialize, Decode)]
@@ -498,6 +498,7 @@ impl<E: EthSpec> Tester<E> {
block_root,
block.clone(),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
))?
.map(|avail: AvailabilityProcessingStatus| avail.try_into());