refactor awaiting_parent field and some metrics

This commit is contained in:
Eitan Seri- Levi
2026-04-03 01:02:57 -07:00
parent 214e3ce9f0
commit f897215684
4 changed files with 69 additions and 53 deletions

View File

@@ -22,7 +22,9 @@
use self::parent_chain::{NodeChain, compute_parent_chains};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
use self::single_block_lookup::{
AwaitingParent, LookupRequestError, LookupResult, SingleBlockLookup,
};
use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE};
use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
use crate::metrics;
@@ -216,8 +218,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.new_current_lookup(
block_root,
Some(block_component),
Some(parent_root),
None,
Some(AwaitingParent::Block(parent_root)),
// On a `UnknownParentBlock` or `UnknownParentBlob` event the peer is not required
// to have the rest of the block components (refer to decoupled blob gossip). Create
// the lookup with zero peers to house the block components.
@@ -250,8 +251,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.new_current_lookup(
block_root,
Some(block_component),
None, // not awaiting parent block
Some(parent_root),
Some(AwaitingParent::Envelope(parent_root)),
&[],
cx,
)
@@ -270,7 +270,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_source: &[PeerId],
cx: &mut SyncNetworkContext<T>,
) -> bool {
self.new_current_lookup(block_root, None, None, None, peer_source, cx)
self.new_current_lookup(block_root, None, None, peer_source, cx)
}
/// A block or blob triggers the search of a parent.
@@ -375,7 +375,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
// `block_root_to_search` is a failed chain check happens inside new_current_lookup
self.new_current_lookup(block_root_to_search, None, None, None, peers, cx)
self.new_current_lookup(block_root_to_search, None, None, peers, cx)
}
/// A block triggers the search of a parent envelope.
@@ -437,8 +437,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
block_root: Hash256,
block_component: Option<BlockComponent<T::EthSpec>>,
awaiting_parent: Option<Hash256>,
awaiting_parent_envelope: Option<Hash256>,
awaiting_parent: Option<AwaitingParent>,
peers: &[PeerId],
cx: &mut SyncNetworkContext<T>,
) -> bool {
@@ -473,13 +472,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
// Ensure that awaiting parent exists, otherwise this lookup won't be able to make progress
if let Some(awaiting_parent) = awaiting_parent
if let Some(AwaitingParent::Block(parent_root) | AwaitingParent::Envelope(parent_root)) =
awaiting_parent
&& !self
.single_block_lookups
.iter()
.any(|(_, lookup)| lookup.is_for_block(awaiting_parent))
.any(|(_, lookup)| lookup.is_for_block(parent_root))
{
warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found");
warn!(block_root = ?parent_root, "Ignoring child lookup parent lookup not found");
return false;
}
@@ -493,9 +493,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
// signal here to hold processing downloaded data.
let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent);
if let Some(parent_root) = awaiting_parent_envelope {
lookup.set_awaiting_parent_envelope(parent_root);
}
let _guard = lookup.span.clone().entered();
// Add block components to the new request
@@ -516,9 +513,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(
?peers,
?block_root,
awaiting_parent = awaiting_parent
.map(|root| root.to_string())
.unwrap_or("none".to_owned()),
?awaiting_parent,
id = lookup.id,
"Created block lookup"
);
@@ -936,7 +931,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_parent() == Some(block_root) {
if lookup.awaiting_parent_block() == Some(block_root) {
lookup.resolve_awaiting_parent();
debug!(
parent_root = ?block_root,
@@ -964,7 +959,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_parent_envelope() == Some(block_root) {
lookup.resolve_awaiting_parent_envelope();
lookup.resolve_awaiting_parent();
debug!(
envelope_root = ?block_root,
id,
@@ -996,12 +991,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[reason]);
self.metrics.dropped_lookups += 1;
let dropped_root = dropped_lookup.block_root();
let child_lookups = self
.single_block_lookups
.iter()
.filter(|(_, lookup)| {
lookup.awaiting_parent() == Some(dropped_lookup.block_root())
|| lookup.awaiting_parent_envelope() == Some(dropped_lookup.block_root())
lookup.awaiting_parent_block() == Some(dropped_root)
|| lookup.awaiting_parent_envelope() == Some(dropped_root)
})
.map(|(id, _)| *id)
.collect::<Vec<_>>();
@@ -1170,17 +1166,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&'a self,
lookup: &'a SingleBlockLookup<T>,
) -> Result<&'a SingleBlockLookup<T>, String> {
if let Some(awaiting_parent) = lookup.awaiting_parent() {
if let Some(parent_root) = lookup.awaiting_parent_block() {
if let Some(lookup) = self
.single_block_lookups
.values()
.find(|l| l.block_root() == awaiting_parent)
.find(|l| l.block_root() == parent_root)
{
self.find_oldest_ancestor_lookup(lookup)
} else {
Err(format!(
"Lookup references unknown parent {awaiting_parent:?}"
))
Err(format!("Lookup references unknown parent {parent_root:?}"))
}
} else {
Ok(lookup)
@@ -1213,7 +1207,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
if let Some(parent_root) = lookup.awaiting_parent() {
if let Some(parent_root) = lookup.awaiting_parent_block() {
if let Some((&child_id, _)) = self
.single_block_lookups
.iter()

View File

@@ -13,7 +13,7 @@ impl<T: BeaconChainTypes> From<&SingleBlockLookup<T>> for Node {
fn from(value: &SingleBlockLookup<T>) -> Self {
Self {
block_root: value.block_root(),
parent_root: value.awaiting_parent(),
parent_root: value.awaiting_parent_block(),
}
}
}

View File

@@ -58,6 +58,14 @@ pub enum LookupRequestError {
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AwaitingParent {
/// Waiting for the parent block to be imported.
Block(Hash256),
/// The parent block is imported but its execution payload envelope is missing.
Envelope(Hash256),
}
#[derive(Educe)]
#[educe(Debug(bound(T: BeaconChainTypes)))]
pub struct SingleBlockLookup<T: BeaconChainTypes> {
@@ -71,8 +79,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
#[educe(Debug(method(fmt_peer_set_as_len)))]
peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
awaiting_parent_envelope: Option<Hash256>,
awaiting_parent: Option<AwaitingParent>,
created: Instant,
pub(crate) span: Span,
}
@@ -93,7 +100,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
requested_block_root: Hash256,
peers: &[PeerId],
id: Id,
awaiting_parent: Option<Hash256>,
awaiting_parent: Option<AwaitingParent>,
) -> Self {
let lookup_span = debug_span!(
"lh_single_block_lookup",
@@ -108,7 +115,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
block_root: requested_block_root,
awaiting_parent,
awaiting_parent_envelope: None,
created: Instant::now(),
span: lookup_span,
}
@@ -131,7 +137,16 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Reset the status of all internal requests
pub fn reset_requests(&mut self) {
self.block_request_state = BlockRequestState::new(self.block_root);
self.component_requests = ComponentRequests::WaitingForBlock;
match &self.component_requests {
ComponentRequests::ActiveEnvelopeRequest(_) => {
self.component_requests = ComponentRequests::ActiveEnvelopeRequest(
EnvelopeRequestState::new(self.block_root),
);
}
_ => {
self.component_requests = ComponentRequests::WaitingForBlock;
}
}
}
/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
@@ -147,34 +162,39 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.block_root
}
pub fn awaiting_parent(&self) -> Option<Hash256> {
pub fn awaiting_parent(&self) -> Option<AwaitingParent> {
self.awaiting_parent
}
/// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send
/// components for processing.
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
self.awaiting_parent = Some(parent_root)
}
/// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for
/// processing.
pub fn resolve_awaiting_parent(&mut self) {
self.awaiting_parent = None;
/// Returns the parent root if awaiting a parent block.
pub fn awaiting_parent_block(&self) -> Option<Hash256> {
match self.awaiting_parent {
Some(AwaitingParent::Block(root)) => Some(root),
_ => None,
}
}
/// Returns the parent root if awaiting a parent envelope.
pub fn awaiting_parent_envelope(&self) -> Option<Hash256> {
self.awaiting_parent_envelope
match self.awaiting_parent {
Some(AwaitingParent::Envelope(root)) => Some(root),
_ => None,
}
}
/// Mark this lookup as awaiting a parent block to be imported before processing.
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
self.awaiting_parent = Some(AwaitingParent::Block(parent_root));
}
/// Mark this lookup as awaiting a parent envelope to be imported before processing.
pub fn set_awaiting_parent_envelope(&mut self, parent_root: Hash256) {
self.awaiting_parent_envelope = Some(parent_root);
self.awaiting_parent = Some(AwaitingParent::Envelope(parent_root));
}
/// Mark this lookup as no longer awaiting a parent envelope.
pub fn resolve_awaiting_parent_envelope(&mut self) {
self.awaiting_parent_envelope = None;
/// Mark this lookup as no longer awaiting any parent.
pub fn resolve_awaiting_parent(&mut self) {
self.awaiting_parent = None;
}
/// Returns the time elapsed since this lookup was created
@@ -219,7 +239,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Returns true if this request is expecting some event to make progress
pub fn is_awaiting_event(&self) -> bool {
self.awaiting_parent.is_some()
|| self.awaiting_parent_envelope.is_some()
|| self.block_request_state.state.is_awaiting_event()
|| match &self.component_requests {
// If components are waiting for the block request to complete, here we should
@@ -328,8 +347,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
expected_blobs: usize,
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_event =
self.awaiting_parent.is_some() || self.awaiting_parent_envelope.is_some();
let awaiting_event = self.awaiting_parent.is_some();
let request =
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;

View File

@@ -1917,6 +1917,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
"data_columns_by_range",
self.data_columns_by_range_requests.len(),
),
(
"payload_envelopes_by_root",
self.payload_envelopes_by_root_requests.len(),
),
("custody_by_root", self.custody_by_root_requests.len()),
(
"components_by_range",