Log stuck lookups (#5778)

* Log stuck lookups every interval

* Implement debug manually

* Add comment

* Do not print peers twice

* Add SYNC_LOOKUPS_STUCK metric

* Skip logging request root

* use derivative

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into log-stuck-lookups

* add req id to debug

* Merge remote-tracking branch 'sigp/unstable' into log-stuck-lookups

* Fix conflict with unstable
This commit is contained in:
Lion - dapplion
2024-05-14 20:34:26 +03:00
committed by GitHub
parent 683d9df63b
commit 6f45ad4534
4 changed files with 73 additions and 3 deletions

View File

@@ -30,6 +30,7 @@ mod tests;
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
const LOOKUP_MAX_DURATION_SECS: u64 = 60;
pub enum BlockComponent<E: EthSpec> {
Block(DownloadResult<Arc<SignedBeaconBlock<E>>>),
@@ -665,4 +666,21 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.single_block_lookups.len() as i64,
);
}
pub fn log_stuck_lookups(&self) {
let mut stuck_count = 0;
for lookup in self.single_block_lookups.values() {
if lookup.elapsed_since_created() > Duration::from_secs(LOOKUP_MAX_DURATION_SECS) {
debug!(self.log, "Lookup maybe stuck";
// Fields id and block_root are also part of the summary. However, logging them
// here allows log parsers o index them and have better search
"id" => lookup.id,
"block_root" => ?lookup.block_root(),
"summary" => ?lookup,
);
stuck_count += 1;
}
}
metrics::set_gauge(&metrics::SYNC_LOOKUPS_STUCK, stuck_count);
}
}

View File

@@ -4,12 +4,13 @@ use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::{LookupRequestResult, ReqId, SyncNetworkContext};
use beacon_chain::BeaconChainTypes;
use derivative::Derivative;
use itertools::Itertools;
use rand::seq::IteratorRandom;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
@@ -53,12 +54,15 @@ pub enum LookupRequestError {
},
}
#[derive(Derivative)]
#[derivative(Debug(bound = "T: BeaconChainTypes"))]
pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
created: Instant,
}
impl<T: BeaconChainTypes> SingleBlockLookup<T> {
@@ -74,6 +78,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
blob_request_state: BlobRequestState::new(requested_block_root, peers),
block_root: requested_block_root,
awaiting_parent,
created: Instant::now(),
}
}
@@ -98,6 +103,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.awaiting_parent = None;
}
/// Returns the time elapsed since this lookup was created
pub fn elapsed_since_created(&self) -> Duration {
self.created.elapsed()
}
/// Maybe insert a verified response into this lookup. Returns true if imported
pub fn add_child_components(&mut self, block_component: BlockComponent<T::EthSpec>) -> bool {
match block_component {
@@ -244,7 +254,10 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
/// The state of the blob request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct BlobRequestState<E: EthSpec> {
#[derivative(Debug = "ignore")]
pub block_root: Hash256,
pub state: SingleLookupRequestState<FixedBlobSidecarList<E>>,
}
@@ -259,7 +272,10 @@ impl<E: EthSpec> BlobRequestState<E> {
}
/// The state of the block request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct BlockRequestState<E: EthSpec> {
#[derivative(Debug = "ignore")]
pub requested_block_root: Hash256,
pub state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
}
@@ -281,7 +297,7 @@ pub struct DownloadResult<T: Clone> {
pub peer_id: PeerId,
}
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
#[derive(PartialEq, Eq, IntoStaticStr)]
pub enum State<T: Clone> {
AwaitingDownload,
Downloading(ReqId),
@@ -293,13 +309,16 @@ pub enum State<T: Clone> {
}
/// Object representing the state of a single block or blob lookup request.
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Derivative)]
#[derivative(Debug)]
pub struct SingleLookupRequestState<T: Clone> {
/// State of this request.
state: State<T>,
/// Peers that should have this block or blob.
#[derivative(Debug(format_with = "fmt_peer_set"))]
available_peers: HashSet<PeerId>,
/// Peers from which we have requested this block.
#[derivative(Debug = "ignore")]
used_peers: HashSet<PeerId>,
/// How many times have we attempted to process this block or blob.
failed_processing: u8,
@@ -529,8 +548,30 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}
// Display is used in the BadState assertions above
impl<T: Clone> std::fmt::Display for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", Into::<&'static str>::into(self))
}
}
// Debug is used in the log_stuck_lookups print to include some more info. Implements custom Debug
// to not dump an entire block or blob to terminal which don't add valuable data.
impl<T: Clone> std::fmt::Debug for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AwaitingDownload { .. } => write!(f, "AwaitingDownload"),
Self::Downloading(req_id) => write!(f, "Downloading({:?})", req_id),
Self::AwaitingProcess(d) => write!(f, "AwaitingProcess({:?})", d.peer_id),
Self::Processing(d) => write!(f, "Processing({:?})", d.peer_id),
Self::Processed { .. } => write!(f, "Processed"),
}
}
}
fn fmt_peer_set(
peer_set: &HashSet<PeerId>,
f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
write!(f, "{}", peer_set.len())
}

View File

@@ -547,6 +547,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
futures::stream::iter(ee_responsiveness_watch.await).flatten()
};
// LOOKUP_MAX_DURATION_SECS is 60 seconds. Logging every 30 seconds allows enough timely
// visbility while being sparse and not increasing the debug log volume in a noticeable way
let mut interval = tokio::time::interval(Duration::from_secs(30));
// process any inbound messages
loop {
tokio::select! {
@@ -556,6 +560,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
Some(engine_state) = check_ee_stream.next(), if check_ee => {
self.handle_new_execution_engine_state(engine_state);
}
_ = interval.tick() => {
self.block_lookups.log_stuck_lookups();
}
}
}
}