Gloas lookup sync (#9155)

Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>

Co-Authored-By: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
Lion - dapplion
2026-06-10 03:41:26 +02:00
committed by GitHub
parent ebe5ded2fa
commit 47e0901965
19 changed files with 1075 additions and 193 deletions

View File

@@ -22,13 +22,16 @@
use self::parent_chain::{NodeChain, compute_parent_chains};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
use self::single_block_lookup::{LookupRequestError, PeerType, SingleBlockLookup};
use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE};
use super::network_context::{RpcResponseError, SyncNetworkContext};
use crate::metrics;
use crate::network_beacon_processor::BlockProcessingResult;
use crate::sync::SyncMessage;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::block_lookups::single_block_lookup::{
AwaitingParent, ImportedParent, LookupResult,
};
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
@@ -39,7 +42,10 @@ use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
use tracing::{debug, error, warn};
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock};
use types::{
DataColumnSidecarList, EthSpec, ExecutionBlockHash, SignedBeaconBlock,
SignedExecutionPayloadEnvelope,
};
pub mod parent_chain;
mod single_block_lookup;
@@ -73,6 +79,8 @@ const MAX_LOOKUPS: usize = 200;
type BlockDownloadResponse<E> = Result<DownloadResult<Arc<SignedBeaconBlock<E>>>, RpcResponseError>;
type CustodyDownloadResponse<E> =
Result<DownloadResult<DataColumnSidecarList<E>>, RpcResponseError>;
type PayloadDownloadResponse<E> =
Result<DownloadResult<Arc<SignedExecutionPayloadEnvelope<E>>>, RpcResponseError>;
pub enum BlockComponent<E: EthSpec> {
Block(DownloadResult<Arc<SignedBeaconBlock<E>>>),
@@ -169,22 +177,29 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
block_root: Hash256,
block_component: BlockComponent<T::EthSpec>,
parent_root: Hash256,
parent_block_hash: Option<ExecutionBlockHash>,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) -> bool {
let parent_lookup_exists =
self.search_parent_of_child(parent_root, block_root, &[peer_id], cx);
let parent_lookup_exists = self.search_parent_of_child(
parent_root,
&PeerType::new(parent_block_hash),
block_root,
&[peer_id],
cx,
);
// Only create the child lookup if the parent exists
if parent_lookup_exists {
// `search_parent_of_child` ensures that the parent lookup exists so we can safely wait for it
self.new_current_lookup(
block_root,
Some(block_component),
Some(parent_root),
Some(AwaitingParent::new(parent_root, parent_block_hash)),
// On a `UnknownParentBlock` or `UnknownParentSidecarHeader` event the peer is not
// required to have the rest of the block components. Create the lookup with zero
// peers to house the block components.
&[],
&PeerType::Block,
cx,
)
} else {
@@ -202,7 +217,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_source: &[PeerId],
cx: &mut SyncNetworkContext<T>,
) -> bool {
self.new_current_lookup(block_root, None, None, peer_source, cx)
self.new_current_lookup(block_root, None, None, peer_source, &PeerType::Block, cx)
}
/// A block or blob triggers the search of a parent.
@@ -215,6 +230,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn search_parent_of_child(
&mut self,
block_root_to_search: Hash256,
peer_type: &PeerType,
child_block_root_trigger: Hash256,
peers: &[PeerId],
cx: &mut SyncNetworkContext<T>,
@@ -307,7 +323,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
// `block_root_to_search` is a failed chain check happens inside new_current_lookup
self.new_current_lookup(block_root_to_search, None, None, peers, cx)
self.new_current_lookup(block_root_to_search, None, None, peers, peer_type, cx)
}
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
@@ -318,8 +334,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
block_root: Hash256,
block_component: Option<BlockComponent<T::EthSpec>>,
awaiting_parent: Option<Hash256>,
awaiting_parent: Option<AwaitingParent>,
peers: &[PeerId],
peer_type: &PeerType,
cx: &mut SyncNetworkContext<T>,
) -> bool {
// If this block or it's parent is part of a known ignored chain, ignore it.
@@ -341,7 +358,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) {
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, peer_type, cx)
{
warn!(error = ?e, "Error adding peers to ancestor lookup");
}
@@ -353,7 +371,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&& !self
.single_block_lookups
.iter()
.any(|(_, lookup)| lookup.is_for_block(awaiting_parent))
.any(|(_, lookup)| lookup.block_root() == awaiting_parent.parent_root())
{
warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found");
return false;
@@ -368,7 +386,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
// signal here to hold processing downloaded data.
let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent);
let mut lookup =
SingleBlockLookup::new(block_root, peers, peer_type, cx.next_id(), awaiting_parent);
let _guard = lookup.span.clone().entered();
// Add block components to the new request
@@ -389,9 +408,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(
?peers,
?block_root,
awaiting_parent = awaiting_parent
.map(|root| root.to_string())
.unwrap_or("none".to_owned()),
?awaiting_parent,
id = lookup.id,
"Created block lookup"
);
@@ -438,6 +455,23 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.on_lookup_result(id.lookup_id, result, "custody_download_response", cx);
}
pub fn on_payload_download_response(
&mut self,
id: SingleLookupReqId,
response: PayloadDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
debug!(
?id,
"Payload envelope returned for a lookup id that doesn't exist"
);
return;
};
let result = lookup.on_payload_download_response(id.req_id, response, cx);
self.on_lookup_result(id.lookup_id, result, "payload_download_response", cx);
}
/* Error responses */
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
@@ -472,25 +506,62 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
let lookup_result = match process_type {
BlockProcessType::SingleBlock { .. } => lookup.on_block_processing_result(result, cx),
BlockProcessType::SingleBlock { .. } => {
// Update the result of the lookup first, here we may start the download of Gloas
// payload, which may error.
let lookup_result = lookup.on_block_processing_result(result.clone(), cx);
let lookup_is_awaiting_event = lookup.is_awaiting_event();
let block_root = lookup.block_root();
// Then, as a side-effect continue the EMPTY children of this lookup. Only if the
// block just imported which ensures we just do it once per lookup.
if let BlockProcessingResult::Imported(..) = result
&& let Some(bid_block_hash) = lookup.peek_downloaded_bid_block_hash()
{
self.continue_child_lookups(
block_root,
ImportedParent::OnlyGloasBlock(bid_block_hash),
cx,
);
}
// Then if this lookup happens to have only empty children we can remove it now. We
// must make sure that no other lookup is awaiting this one, and that no requests
// are on-going.
if !lookup_is_awaiting_event && !self.has_any_awaiting_children(block_root) {
Ok(LookupResult::Completed)
} else {
lookup_result
}
}
BlockProcessType::SingleCustodyColumn(_) => {
lookup.on_data_processing_result(result, cx)
}
// TODO(gloas): route into the payload envelope lookup state machine.
BlockProcessType::SinglePayloadEnvelope(_) => Ok(LookupResult::Pending),
BlockProcessType::SinglePayloadEnvelope(_) => {
lookup.on_payload_processing_result(result, cx)
}
};
self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx);
}
pub fn has_any_awaiting_children(&self, block_root: Hash256) -> bool {
self.single_block_lookups
.iter()
.any(|(_, lookup)| lookup.is_awaiting_block(block_root))
}
/// Makes progress on the immediate children of `block_root`
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
pub fn continue_child_lookups(
&mut self,
parent_root: Hash256,
imported_parent: ImportedParent,
cx: &mut SyncNetworkContext<T>,
) {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_parent() == Some(block_root) {
if lookup.is_awaiting_parent(parent_root, imported_parent) {
lookup.resolve_awaiting_parent();
debug!(
parent_root = ?block_root,
?imported_parent,
id,
block_root = ?lookup.block_root(),
"Continuing child lookup"
@@ -523,7 +594,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let child_lookups = self
.single_block_lookups
.iter()
.filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root()))
.filter(|(_, lookup)| lookup.is_awaiting_block(dropped_lookup.block_root()))
.map(|(id, _)| *id)
.collect::<Vec<_>>();
@@ -546,10 +617,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Ok(LookupResult::Pending) => true,
Ok(LookupResult::ParentUnknown {
parent_root,
parent_block_hash,
block_root,
peers,
}) => {
if self.search_parent_of_child(parent_root, block_root, &peers, cx) {
if self.search_parent_of_child(
parent_root,
&PeerType::new(parent_block_hash),
block_root,
&peers,
cx,
) {
true
} else {
self.drop_lookup_and_children(id, "Failed");
@@ -567,16 +645,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
self.metrics.completed_lookups += 1;
// Block imported, continue the requests of pending child blocks
self.continue_child_lookups(lookup.block_root(), cx);
self.continue_child_lookups(
lookup.block_root(),
ImportedParent::LookupComplete,
cx,
);
self.update_metrics();
} else {
debug!(id, "Attempting to drop non-existent lookup");
}
false
}
// If UnknownLookup do not log the request error. No need to drop child lookups nor
// update metrics because the lookup does not exist.
Err(LookupRequestError::UnknownLookup) => false,
Err(error) => {
debug!(id, source, ?error, "Dropping lookup on request error");
self.drop_lookup_and_children(id, error.into());
@@ -708,7 +787,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if let Some(lookup) = self
.single_block_lookups
.values()
.find(|l| l.block_root() == awaiting_parent)
.find(|l| l.block_root() == awaiting_parent.parent_root())
{
self.find_oldest_ancestor_lookup(lookup)
} else {
@@ -729,6 +808,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
lookup_id: SingleLookupId,
peers: &[PeerId],
peer_type: &PeerType,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), String> {
let lookup = self
@@ -738,7 +818,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut added_some_peer = false;
for peer in peers {
if lookup.add_peer(*peer) {
if lookup.add_peer(*peer, peer_type) {
added_some_peer = true;
debug!(
block_root = ?lookup.block_root(),
@@ -748,15 +828,21 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
if let Some(parent_root) = lookup.awaiting_parent() {
if let Some(&awaiting_parent) = lookup.awaiting_parent() {
// Regardless of gloas full/empty the lookup to add peers to is keyed by block_root
if let Some((&parent_id, _)) = self
.single_block_lookups
.iter()
.find(|(_, l)| l.block_root() == parent_root)
.find(|(_, l)| l.block_root() == awaiting_parent.parent_root())
{
self.add_peers_to_lookup_and_ancestors(parent_id, peers, cx)
self.add_peers_to_lookup_and_ancestors(
parent_id,
peers,
&awaiting_parent.into_peer_type(),
cx,
)
} else {
Err(format!("Lookup references unknown parent {parent_root:?}"))
Err(format!("Lookup references unknown {awaiting_parent:?}"))
}
} else if added_some_peer {
// If this lookup is not awaiting a parent and we added at least one peer, attempt to