mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-15 17:58:23 +00:00
Complete gloas child logic
This commit is contained in:
@@ -28,7 +28,7 @@ use logging::crit;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, debug_span, error, info, instrument, warn};
|
||||
use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256};
|
||||
use types::{BlockImportSource, DataColumnSidecarList, Epoch, ExecutionBlockHash, Hash256};
|
||||
|
||||
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
@@ -962,13 +962,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
|
||||
/// The classified outcome of submitting a block / blob / column for processing, ready for the
|
||||
/// lookup state machine to act on without re-inspecting `BlockError`.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum BlockProcessingResult {
|
||||
/// `fully_imported` is true if the lookup is complete; false if `MissingComponents` (the
|
||||
/// lookup must keep fetching). `info` is a stable label for logs / metrics.
|
||||
Imported(bool, &'static str),
|
||||
ParentUnknown {
|
||||
parent_root: Hash256,
|
||||
parent_block_hash: Option<ExecutionBlockHash>,
|
||||
},
|
||||
/// Processing failed. `penalty` is `Some` when an attributable peer should be downscored;
|
||||
/// the third tuple element is the `report_peer` telemetry msg. `reason` is for logs only.
|
||||
@@ -1000,9 +1001,13 @@ impl From<Result<AvailabilityProcessingStatus, BlockError>> for BlockProcessingR
|
||||
return Self::Imported(true, "duplicate");
|
||||
}
|
||||
BlockError::GenesisBlock => return Self::Imported(true, "genesis"),
|
||||
BlockError::ParentUnknown { parent_root } => {
|
||||
BlockError::ParentUnknown {
|
||||
parent_root,
|
||||
parent_block_hash,
|
||||
} => {
|
||||
return Self::ParentUnknown {
|
||||
parent_root: *parent_root,
|
||||
parent_block_hash: *parent_block_hash,
|
||||
};
|
||||
}
|
||||
BlockError::BeaconChainError(_) | BlockError::InternalError(_) => None,
|
||||
|
||||
@@ -22,13 +22,14 @@
|
||||
|
||||
use self::parent_chain::{NodeChain, compute_parent_chains};
|
||||
pub use self::single_block_lookup::DownloadResult;
|
||||
use self::single_block_lookup::{LookupRequestError, LookupResult, PeerType, SingleBlockLookup};
|
||||
use self::single_block_lookup::{LookupRequestError, PeerType, SingleBlockLookup};
|
||||
use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE};
|
||||
use super::network_context::{RpcResponseError, SyncNetworkContext};
|
||||
use crate::metrics;
|
||||
use crate::network_beacon_processor::BlockProcessingResult;
|
||||
use crate::sync::SyncMessage;
|
||||
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
|
||||
use crate::sync::block_lookups::single_block_lookup::{AwaitingParent, ImportedAction};
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::PeerId;
|
||||
@@ -191,7 +192,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
self.new_current_lookup(
|
||||
block_root,
|
||||
Some(block_component),
|
||||
Some(parent_root),
|
||||
Some(AwaitingParent::new(parent_root, parent_block_hash)),
|
||||
// On a `UnknownParentBlock` or `UnknownParentSidecarHeader` event the peer is not
|
||||
// required to have the rest of the block components. Create the lookup with zero
|
||||
// peers to house the block components. We don't know the child's fork yet, so use
|
||||
@@ -336,7 +337,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
block_component: Option<BlockComponent<T::EthSpec>>,
|
||||
awaiting_parent: Option<Hash256>,
|
||||
awaiting_parent: Option<AwaitingParent>,
|
||||
peers: &[PeerId],
|
||||
peer_type: &PeerType,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
@@ -373,7 +374,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
&& !self
|
||||
.single_block_lookups
|
||||
.iter()
|
||||
.any(|(_, lookup)| lookup.is_for_block(awaiting_parent))
|
||||
.any(|(_, lookup)| lookup.is_for_block(awaiting_parent.parent_root()))
|
||||
{
|
||||
warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found");
|
||||
return false;
|
||||
@@ -410,9 +411,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
debug!(
|
||||
?peers,
|
||||
?block_root,
|
||||
awaiting_parent = awaiting_parent
|
||||
.map(|root| root.to_string())
|
||||
.unwrap_or("none".to_owned()),
|
||||
?awaiting_parent,
|
||||
id = lookup.id,
|
||||
"Created block lookup"
|
||||
);
|
||||
@@ -495,40 +494,90 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
result: BlockProcessingResult,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
let lookup_id = process_type.id();
|
||||
let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else {
|
||||
debug!(id = lookup_id, "Unknown single block lookup");
|
||||
let id = process_type.id();
|
||||
let Some(lookup) = self.single_block_lookups.get_mut(&id) else {
|
||||
debug!(id, "Unknown single block lookup");
|
||||
return;
|
||||
};
|
||||
let block_root = lookup.block_root();
|
||||
|
||||
debug!(
|
||||
block_root = ?lookup.block_root(),
|
||||
id = lookup_id,
|
||||
?block_root,
|
||||
id,
|
||||
?process_type,
|
||||
?result,
|
||||
"Received lookup processing result"
|
||||
);
|
||||
|
||||
let block_root = lookup.block_root();
|
||||
// Gloas: a block imports into fork choice on block + columns, *before* its payload
|
||||
// envelope. Children awaiting it must re-evaluate at that point: an EMPTY child can import
|
||||
// on the parent block alone, while a FULL child re-awaits the parent's payload.
|
||||
let block_imported = matches!(process_type, BlockProcessType::SingleBlock { .. })
|
||||
&& matches!(result, BlockProcessingResult::Imported(..));
|
||||
|
||||
let lookup_result = match process_type {
|
||||
BlockProcessType::SingleBlock { .. } => lookup.on_block_processing_result(result, cx),
|
||||
BlockProcessType::SingleBlock { .. } => lookup.on_block_processing_result(&result, cx),
|
||||
BlockProcessType::SingleCustodyColumn(_) => {
|
||||
lookup.on_data_processing_result(result, cx)
|
||||
lookup.on_data_processing_result(&result, cx)
|
||||
}
|
||||
BlockProcessType::SinglePayloadEnvelope(_) => {
|
||||
lookup.on_payload_processing_result(result, cx)
|
||||
lookup.on_payload_processing_result(&result, cx)
|
||||
}
|
||||
};
|
||||
self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx);
|
||||
if block_imported {
|
||||
self.continue_child_lookups(block_root, cx);
|
||||
|
||||
match &result {
|
||||
BlockProcessingResult::Imported(_, _) => {
|
||||
// Some component got imported potentially continue
|
||||
if lookup.is_complete() {
|
||||
if let Some(_) = self.single_block_lookups.remove(&id) {
|
||||
debug!(?block_root, id, "Dropping completed lookup");
|
||||
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
|
||||
self.metrics.completed_lookups += 1;
|
||||
// Block imported, continue the requests of pending child blocks
|
||||
self.continue_child_lookups(
|
||||
ImportedAction::LookupComplete { block_root },
|
||||
cx,
|
||||
);
|
||||
self.update_metrics();
|
||||
} else {
|
||||
debug!(id, "Attempting to drop non-existent lookup");
|
||||
}
|
||||
} else if matches!(process_type, BlockProcessType::SingleBlock { .. }) {
|
||||
if let Some(bid_block_hash) = lookup.peek_downloaded_bid_block_hash() {
|
||||
// Continue child lookups for empty children
|
||||
self.continue_child_lookups(
|
||||
ImportedAction::GloasBlockComplete {
|
||||
block_root,
|
||||
bid_block_hash,
|
||||
},
|
||||
cx,
|
||||
);
|
||||
if !self.has_any_awaiting_children(block_root) {
|
||||
self.single_block_lookups.remove(&id);
|
||||
debug!(
|
||||
?block_root,
|
||||
id, "Dropping completed lookup after gloas block"
|
||||
);
|
||||
}
|
||||
self.update_metrics();
|
||||
}
|
||||
}
|
||||
}
|
||||
BlockProcessingResult::ParentUnknown {
|
||||
parent_root,
|
||||
parent_block_hash,
|
||||
} => {
|
||||
// Parent unknown error, create parent lookup
|
||||
let peers = lookup.all_peers();
|
||||
if !self.search_parent_of_child(
|
||||
*parent_root,
|
||||
&PeerType::new(*parent_block_hash),
|
||||
block_root,
|
||||
&peers,
|
||||
cx,
|
||||
) {
|
||||
self.drop_lookup_and_children(id, "Failed");
|
||||
self.update_metrics();
|
||||
}
|
||||
}
|
||||
BlockProcessingResult::Error { .. } => {}
|
||||
}
|
||||
|
||||
self.on_lookup_result(id, lookup_result, "processing_result", cx);
|
||||
}
|
||||
|
||||
pub fn on_external_processing_result(
|
||||
@@ -546,8 +595,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
return;
|
||||
};
|
||||
|
||||
// TOOD(gloas): This is broken... Getting a block processed result must not complete the
|
||||
// entire post-gloas lookup
|
||||
let lookup_result = if imported {
|
||||
Ok(LookupResult::Completed)
|
||||
Ok(())
|
||||
} else {
|
||||
// A lookup may be in the following state:
|
||||
// - Block awaiting processing from a different source
|
||||
@@ -564,15 +615,28 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
self.on_lookup_result(id, lookup_result, "external_processing_result", cx);
|
||||
}
|
||||
|
||||
pub fn has_any_awaiting_children(&self, block_root: Hash256) -> bool {
|
||||
self.single_block_lookups
|
||||
.iter()
|
||||
.any(|(_, lookup)| lookup.is_awaiting_block(block_root))
|
||||
}
|
||||
|
||||
/// Makes progress on the immediate children of `block_root`
|
||||
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
|
||||
pub fn continue_child_lookups(
|
||||
&mut self,
|
||||
import_action: ImportedAction,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
|
||||
|
||||
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
||||
if lookup.awaiting_parent() == Some(block_root) {
|
||||
lookup.resolve_awaiting_parent();
|
||||
// If lookup is awaiting parent?
|
||||
// - If Some
|
||||
// - If parent_root lookup got block
|
||||
// - Check if the child is FULL, if so keep waiting, otherwise continue and resolve
|
||||
if lookup.maybe_resolve_awaiting_parent(import_action) {
|
||||
debug!(
|
||||
parent_root = ?block_root,
|
||||
?import_action,
|
||||
id,
|
||||
block_root = ?lookup.block_root(),
|
||||
"Continuing child lookup"
|
||||
@@ -605,7 +669,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
let child_lookups = self
|
||||
.single_block_lookups
|
||||
.iter()
|
||||
.filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root()))
|
||||
.filter(|(_, lookup)| lookup.is_awaiting_block(dropped_lookup.block_root()))
|
||||
.map(|(id, _)| *id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -620,69 +684,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
fn on_lookup_result(
|
||||
&mut self,
|
||||
id: SingleLookupId,
|
||||
result: Result<LookupResult, LookupRequestError>,
|
||||
result: Result<(), LookupRequestError>,
|
||||
source: &str,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
_cx: &mut SyncNetworkContext<T>,
|
||||
) -> bool {
|
||||
match result {
|
||||
Ok(LookupResult::Pending) => true,
|
||||
Ok(LookupResult::ParentUnknown {
|
||||
parent_root,
|
||||
parent_block_hash,
|
||||
block_root,
|
||||
peers,
|
||||
}) => {
|
||||
if self.search_parent_of_child(
|
||||
parent_root,
|
||||
&PeerType::new(parent_block_hash),
|
||||
block_root,
|
||||
&peers,
|
||||
cx,
|
||||
) {
|
||||
true
|
||||
} else {
|
||||
self.drop_lookup_and_children(id, "Failed");
|
||||
self.update_metrics();
|
||||
false
|
||||
}
|
||||
}
|
||||
Ok(LookupResult::Completed) => {
|
||||
if let Some(lookup) = self.single_block_lookups.remove(&id) {
|
||||
debug!(
|
||||
block = ?lookup.block_root(),
|
||||
id,
|
||||
"Dropping completed lookup"
|
||||
);
|
||||
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
|
||||
self.metrics.completed_lookups += 1;
|
||||
// Block imported, continue the requests of pending child blocks
|
||||
self.continue_child_lookups(lookup.block_root(), cx);
|
||||
self.update_metrics();
|
||||
} else {
|
||||
debug!(id, "Attempting to drop non-existent lookup");
|
||||
}
|
||||
false
|
||||
}
|
||||
Ok(_) => true,
|
||||
// If UnknownLookup do not log the request error. No need to drop child lookups nor
|
||||
// update metrics because the lookup does not exist.
|
||||
Err(LookupRequestError::UnknownLookup) => false,
|
||||
Err(error) => {
|
||||
// Retain a failed lookup while another lookup awaits it: a FULL Gloas child awaits
|
||||
// its parent's payload, so the parent's failed payload download must not cascade-
|
||||
// drop the child. The parent stays until its payload arrives (or it is reaped as
|
||||
// stuck).
|
||||
if let Some(block_root) = self.single_block_lookups.get(&id).map(|l| l.block_root())
|
||||
&& self.is_awaited(block_root)
|
||||
{
|
||||
debug!(
|
||||
id,
|
||||
source,
|
||||
?error,
|
||||
?block_root,
|
||||
"Retaining failed lookup awaited by a child"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
debug!(id, source, ?error, "Dropping lookup on request error");
|
||||
self.drop_lookup_and_children(id, error.into());
|
||||
self.update_metrics();
|
||||
@@ -691,13 +701,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if any lookup is awaiting `block_root` as its parent.
|
||||
fn is_awaited(&self, block_root: Hash256) -> bool {
|
||||
self.single_block_lookups
|
||||
.values()
|
||||
.any(|lookup| lookup.awaiting_parent() == Some(block_root))
|
||||
}
|
||||
|
||||
/* Helper functions */
|
||||
|
||||
/// Drops all the single block requests and returns how many requests were dropped.
|
||||
@@ -817,12 +820,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
lookup: &'a SingleBlockLookup<T>,
|
||||
) -> Result<&'a SingleBlockLookup<T>, String> {
|
||||
if let Some(awaiting_parent) = lookup.awaiting_parent() {
|
||||
if let Some(lookup) = self
|
||||
if let Some(parent_lookup) = self
|
||||
.single_block_lookups
|
||||
.values()
|
||||
.find(|l| l.block_root() == awaiting_parent)
|
||||
.find(|l| l.is_parent_of(awaiting_parent))
|
||||
{
|
||||
self.find_oldest_ancestor_lookup(lookup)
|
||||
self.find_oldest_ancestor_lookup(parent_lookup)
|
||||
} else {
|
||||
Err(format!(
|
||||
"Lookup references unknown parent {awaiting_parent:?}"
|
||||
@@ -861,19 +864,22 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(parent_root) = lookup.awaiting_parent() {
|
||||
// When recursing from child to parent, the parent's peer set is keyed by the child's
|
||||
// bid `parent_block_hash` (post-Gloas). A peer that imported this FULL child holds the
|
||||
// parent's payload + columns.
|
||||
let parent_peer_type = lookup.awaiting_parent_peer_type();
|
||||
if let Some((&parent_id, _)) = self
|
||||
if let Some(&awaiting_parent) = lookup.awaiting_parent() {
|
||||
// Regardless of gloas full/empty the lookup to add peers to is keyed by block_root
|
||||
if let Some(parent_id) = self
|
||||
.single_block_lookups
|
||||
.iter()
|
||||
.find(|(_, l)| l.block_root() == parent_root)
|
||||
.find(|(_, l)| l.is_parent_of(&awaiting_parent))
|
||||
.map(|(parent_id, _)| *parent_id)
|
||||
{
|
||||
self.add_peers_to_lookup_and_ancestors(parent_id, peers, &parent_peer_type, cx)
|
||||
self.add_peers_to_lookup_and_ancestors(
|
||||
parent_id,
|
||||
peers,
|
||||
&(&awaiting_parent).into(),
|
||||
cx,
|
||||
)
|
||||
} else {
|
||||
Err(format!("Lookup references unknown parent {parent_root:?}"))
|
||||
Err(format!("Lookup references unknown {awaiting_parent:?}"))
|
||||
}
|
||||
} else if added_some_peer {
|
||||
// If this lookup is not awaiting a parent and we added at least one peer, attempt to
|
||||
|
||||
@@ -13,7 +13,7 @@ impl<T: BeaconChainTypes> From<&SingleBlockLookup<T>> for Node {
|
||||
fn from(value: &SingleBlockLookup<T>) -> Self {
|
||||
Self {
|
||||
block_root: value.block_root(),
|
||||
parent_root: value.awaiting_parent(),
|
||||
parent_root: value.awaiting_parent().map(|a| a.parent_root()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,24 +24,6 @@ use types::{
|
||||
SignedExecutionPayloadEnvelope, Slot,
|
||||
};
|
||||
|
||||
// Dedicated enum for LookupResult to force its usage
|
||||
#[must_use = "LookupResult must be handled with on_lookup_result"]
|
||||
pub enum LookupResult {
|
||||
/// Lookup completed successfully
|
||||
Completed,
|
||||
/// Lookup is expecting some future event from the network
|
||||
Pending,
|
||||
/// Block's parent is not known to fork-choice, a parent lookup is needed
|
||||
ParentUnknown {
|
||||
parent_root: Hash256,
|
||||
/// Post-Gloas only: the child's bid `parent_block_hash`. Lets the parent lookup partition
|
||||
/// peers (a peer that imported this FULL child holds the parent's payload + columns).
|
||||
parent_block_hash: Option<ExecutionBlockHash>,
|
||||
block_root: Hash256,
|
||||
peers: Vec<PeerId>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
|
||||
pub enum LookupRequestError {
|
||||
/// Too many failed attempts
|
||||
@@ -54,8 +36,6 @@ pub enum LookupRequestError {
|
||||
BadState(String),
|
||||
/// Lookup failed for some other reason and should be dropped
|
||||
Failed(/* reason: */ String),
|
||||
/// Attempted to retrieve a not known lookup id
|
||||
UnknownLookup,
|
||||
/// Received a download result for a different request id than the in-flight request.
|
||||
/// There should only exist a single request at a time. Having multiple requests is a bug and
|
||||
/// can result in undefined state, so it's treated as a hard error and the lookup is dropped.
|
||||
@@ -65,6 +45,24 @@ pub enum LookupRequestError {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct AwaitingParent {
|
||||
parent_root: Hash256,
|
||||
parent_block_hash: Option<ExecutionBlockHash>,
|
||||
}
|
||||
|
||||
impl AwaitingParent {
|
||||
pub fn new(parent_root: Hash256, parent_block_hash: Option<ExecutionBlockHash>) -> Self {
|
||||
Self {
|
||||
parent_root,
|
||||
parent_block_hash,
|
||||
}
|
||||
}
|
||||
pub fn parent_root(&self) -> Hash256 {
|
||||
self.parent_root
|
||||
}
|
||||
}
|
||||
|
||||
type PeerSet = Arc<RwLock<HashSet<PeerId>>>;
|
||||
/// Peers that claim to have imported a FULL child of this lookup's block, keyed by the child's bid
|
||||
/// `parent_block_hash` (which equals this block's bid `block_hash` when the child is FULL). Only
|
||||
@@ -167,6 +165,23 @@ impl PeerType {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&AwaitingParent> for PeerType {
|
||||
fn from(value: &AwaitingParent) -> Self {
|
||||
Self::new(value.parent_block_hash)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ImportedAction {
|
||||
LookupComplete {
|
||||
block_root: Hash256,
|
||||
},
|
||||
GloasBlockComplete {
|
||||
block_root: Hash256,
|
||||
bid_block_hash: ExecutionBlockHash,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Educe)]
|
||||
#[educe(Debug(bound(T: BeaconChainTypes)))]
|
||||
pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||
@@ -186,7 +201,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||
/// block's payload envelope and data columns.
|
||||
#[educe(Debug(method(fmt_peer_map_as_len)))]
|
||||
gloas_child_peers: GloasChildPeers,
|
||||
awaiting_parent: Option<Hash256>,
|
||||
awaiting_parent: Option<AwaitingParent>,
|
||||
created: Instant,
|
||||
pub(crate) span: Span,
|
||||
}
|
||||
@@ -197,7 +212,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
peers: &[PeerId],
|
||||
peer_type: &PeerType,
|
||||
id: Id,
|
||||
awaiting_parent: Option<Hash256>,
|
||||
awaiting_parent: Option<AwaitingParent>,
|
||||
) -> Self {
|
||||
let lookup_span = debug_span!(
|
||||
"lh_single_block_lookup",
|
||||
@@ -243,39 +258,87 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
.map(|block| block.slot())
|
||||
}
|
||||
|
||||
pub fn peek_downloaded_bid_block_hash(&self) -> Option<ExecutionBlockHash> {
|
||||
self.block_request
|
||||
.state
|
||||
.peek_downloaded_data()
|
||||
.and_then(|block| {
|
||||
block
|
||||
.message()
|
||||
.body()
|
||||
.signed_execution_payload_bid()
|
||||
.ok()
|
||||
.map(|bid| bid.message.block_hash)
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the block root that is being requested.
|
||||
pub fn block_root(&self) -> Hash256 {
|
||||
self.block_root
|
||||
}
|
||||
|
||||
pub fn awaiting_parent(&self) -> Option<Hash256> {
|
||||
self.awaiting_parent
|
||||
pub fn is_parent_of(&self, child_awaiting_parent: &AwaitingParent) -> bool {
|
||||
self.block_root == child_awaiting_parent.parent_root
|
||||
}
|
||||
|
||||
pub fn is_awaiting_block(&self, block_root: Hash256) -> bool {
|
||||
if let Some(awaiting_parent) = &self.awaiting_parent {
|
||||
awaiting_parent.parent_root() == block_root
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn awaiting_parent(&self) -> Option<&AwaitingParent> {
|
||||
self.awaiting_parent.as_ref()
|
||||
}
|
||||
|
||||
/// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send
|
||||
/// components for processing.
|
||||
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
|
||||
self.awaiting_parent = Some(parent_root);
|
||||
pub fn set_awaiting_parent(&mut self, parent: AwaitingParent) {
|
||||
self.awaiting_parent = Some(parent);
|
||||
}
|
||||
|
||||
/// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for
|
||||
/// processing.
|
||||
pub fn resolve_awaiting_parent(&mut self) {
|
||||
self.awaiting_parent = None;
|
||||
}
|
||||
|
||||
/// This block's bid `parent_block_hash` (the parent's execution hash), derived from the
|
||||
/// downloaded block. Post-Gloas only; `None` pre-Gloas or before the block is downloaded.
|
||||
fn bid_parent_block_hash(&self) -> Option<ExecutionBlockHash> {
|
||||
self.block_request
|
||||
.state
|
||||
.peek_downloaded_data()
|
||||
.and_then(|block| block.parent_block_hash())
|
||||
}
|
||||
|
||||
/// Returns the `PeerType` to use when propagating this lookup's peers up to its parent lookup.
|
||||
pub fn awaiting_parent_peer_type(&self) -> PeerType {
|
||||
PeerType::new(self.bid_parent_block_hash())
|
||||
pub fn maybe_resolve_awaiting_parent(&mut self, action: ImportedAction) -> bool {
|
||||
if let Some(awaiting_parent) = self.awaiting_parent {
|
||||
let should_resolve = match action {
|
||||
ImportedAction::LookupComplete { block_root } => {
|
||||
awaiting_parent.parent_root() == block_root
|
||||
}
|
||||
ImportedAction::GloasBlockComplete {
|
||||
block_root,
|
||||
bid_block_hash,
|
||||
..
|
||||
} => {
|
||||
if awaiting_parent.parent_root() == block_root {
|
||||
if let Some(parent_block_hash) = awaiting_parent.parent_block_hash {
|
||||
// This lookup is the execution child of `parent_execution_hash`. If the
|
||||
// parent hash the same `bid_block_hash` this is FULL child and we must wait
|
||||
// for the entire parent lookup to be imported. Otherwise it's a EMPTY child
|
||||
// and we can import now.
|
||||
parent_block_hash != bid_block_hash
|
||||
} else {
|
||||
// A parent that's gloas imported and this lookup claims to be before gloas.
|
||||
debug_assert!(
|
||||
true,
|
||||
"Received post-gloas import action for pre-gloas lookup"
|
||||
);
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
if should_resolve {
|
||||
self.awaiting_parent = None;
|
||||
}
|
||||
should_resolve
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the time elapsed since this lookup was created
|
||||
@@ -318,12 +381,18 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_complete(&self) -> bool {
|
||||
self.block_request.is_complete()
|
||||
&& self.data_request.is_complete()
|
||||
&& self.payload_request.is_complete()
|
||||
}
|
||||
|
||||
/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
|
||||
/// in dropping the lookup. May mark the lookup as completed.
|
||||
pub fn continue_requests(
|
||||
&mut self,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let _guard = self.span.clone().entered();
|
||||
|
||||
// === Block request ===
|
||||
@@ -425,17 +494,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
}
|
||||
}
|
||||
|
||||
// If all components of this lookup are already processed, there will be no future events
|
||||
// that can make progress so it must be dropped. Consider the lookup completed.
|
||||
// This case can happen if we receive the components from gossip during a retry.
|
||||
if self.block_request.is_complete()
|
||||
&& self.data_request.is_complete()
|
||||
&& self.payload_request.is_complete()
|
||||
{
|
||||
return Ok(LookupResult::Completed);
|
||||
}
|
||||
|
||||
Ok(LookupResult::Pending)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the peers that should serve this block's data columns and payload envelope. For FULL
|
||||
@@ -460,32 +519,33 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
/// Handle block processing result. Advances the lookup state machine.
|
||||
pub fn on_block_processing_result(
|
||||
&mut self,
|
||||
result: BlockProcessingResult,
|
||||
result: &BlockProcessingResult,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
) -> Result<(), LookupRequestError> {
|
||||
match result {
|
||||
BlockProcessingResult::Imported(_fully_imported, _info) => {
|
||||
self.block_request.state.on_processing_success()?;
|
||||
// TODO(gloas): Potentially continue child lookups for empty child
|
||||
// TODO(gloas): If no-one is waiting on this lookup clean it
|
||||
}
|
||||
BlockProcessingResult::ParentUnknown { parent_root } => {
|
||||
BlockProcessingResult::ParentUnknown {
|
||||
parent_root,
|
||||
parent_block_hash,
|
||||
} => {
|
||||
// `BlockError::ParentUnknown` is only returned when processing blocks. Revert the
|
||||
// block request to `Downloaded` and park this lookup until the parent resolves; a
|
||||
// future call to `continue_requests` will re-submit the block for processing once
|
||||
// the parent lookup completes.
|
||||
let parent_block_hash = self.bid_parent_block_hash();
|
||||
self.block_request.state.revert_to_awaiting_processing()?;
|
||||
self.set_awaiting_parent(parent_root);
|
||||
return Ok(LookupResult::ParentUnknown {
|
||||
parent_root,
|
||||
parent_block_hash,
|
||||
block_root: self.block_root,
|
||||
peers: self.all_peers(),
|
||||
self.set_awaiting_parent(AwaitingParent {
|
||||
parent_root: *parent_root,
|
||||
parent_block_hash: *parent_block_hash,
|
||||
});
|
||||
}
|
||||
BlockProcessingResult::Error { penalty, .. } => {
|
||||
let peers = self.block_request.state.on_processing_failure()?;
|
||||
if let Some((action, whom, msg)) = penalty {
|
||||
whom.apply(action, &peers, msg, cx);
|
||||
whom.apply(*action, &peers, msg, cx);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -495,9 +555,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
/// Handle data processing result
|
||||
pub fn on_data_processing_result(
|
||||
&mut self,
|
||||
result: BlockProcessingResult,
|
||||
result: &BlockProcessingResult,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let DataRequest::Request { state, .. } = &mut self.data_request else {
|
||||
return Err(LookupRequestError::BadState("no data_request".to_owned()));
|
||||
};
|
||||
@@ -514,47 +574,19 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
BlockProcessingResult::Error { penalty, .. } => {
|
||||
let peers = state.on_processing_failure()?;
|
||||
if let Some((action, whom, msg)) = penalty {
|
||||
whom.apply(action, &peers, msg, cx);
|
||||
whom.apply(*action, &peers, msg, cx);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
/// Handle a block download response. Updates download state and advances the lookup.
|
||||
pub fn on_block_download_response(
|
||||
&mut self,
|
||||
req_id: ReqId,
|
||||
result: BlockDownloadResponse<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
self.block_request
|
||||
.state
|
||||
.on_download_response(req_id, result)?;
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
/// Handle a custody columns download response. Updates download state and advances the lookup.
|
||||
pub fn on_custody_download_response(
|
||||
&mut self,
|
||||
req_id: ReqId,
|
||||
result: CustodyDownloadResponse<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
let DataRequest::Request { state, .. } = &mut self.data_request else {
|
||||
return Err(LookupRequestError::BadState("no data_request".to_owned()));
|
||||
};
|
||||
|
||||
state.on_download_response(req_id, result)?;
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
/// Handle payload envelope processing result (Gloas only).
|
||||
pub fn on_payload_processing_result(
|
||||
&mut self,
|
||||
result: BlockProcessingResult,
|
||||
result: &BlockProcessingResult,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let PayloadRequest::Request { state, .. } = &mut self.payload_request else {
|
||||
return Err(LookupRequestError::BadState(
|
||||
"no payload_request".to_owned(),
|
||||
@@ -573,20 +605,48 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
BlockProcessingResult::Error { penalty, .. } => {
|
||||
let peers = state.on_processing_failure()?;
|
||||
if let Some((action, whom, msg)) = penalty {
|
||||
whom.apply(action, &peers, msg, cx);
|
||||
whom.apply(*action, &peers, msg, cx);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
/// Handle a block download response. Updates download state and advances the lookup.
|
||||
pub fn on_block_download_response(
|
||||
&mut self,
|
||||
req_id: ReqId,
|
||||
result: BlockDownloadResponse<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
self.block_request
|
||||
.state
|
||||
.on_download_response(req_id, result)?;
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
/// Handle a custody columns download response. Updates download state and advances the lookup.
|
||||
pub fn on_custody_download_response(
|
||||
&mut self,
|
||||
req_id: ReqId,
|
||||
result: CustodyDownloadResponse<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let DataRequest::Request { state, .. } = &mut self.data_request else {
|
||||
return Err(LookupRequestError::BadState("no data_request".to_owned()));
|
||||
};
|
||||
|
||||
state.on_download_response(req_id, result)?;
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
/// Handle a payload envelope download response. Updates download state and advances the lookup.
|
||||
pub fn on_payload_download_response(
|
||||
&mut self,
|
||||
req_id: ReqId,
|
||||
result: PayloadDownloadResponse<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let PayloadRequest::Request { state, .. } = &mut self.payload_request else {
|
||||
return Err(LookupRequestError::BadState(
|
||||
"no payload_request".to_owned(),
|
||||
|
||||
Reference in New Issue
Block a user