diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 5e39c9bc1e..528a261bb8 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -962,7 +962,7 @@ impl NetworkBeaconProcessor { /// The classified outcome of submitting a block / blob / column for processing, ready for the /// lookup state machine to act on without re-inspecting `BlockError`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum BlockProcessingResult { /// `fully_imported` is true if the lookup is complete; false if `MissingComponents` (the /// lookup must keep fetching). `info` is a stable label for logs / metrics. diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index c656aab4ad..03c7fc8195 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -29,7 +29,9 @@ use crate::metrics; use crate::network_beacon_processor::BlockProcessingResult; use crate::sync::SyncMessage; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; -use crate::sync::block_lookups::single_block_lookup::{AwaitingParent, ImportedAction}; +use crate::sync::block_lookups::single_block_lookup::{ + AwaitingParent, ImportedParent, LookupResult, +}; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_network::PeerId; @@ -414,7 +416,7 @@ impl BlockLookups { self.metrics.created_lookups += 1; let result = lookup.continue_requests(cx); - self.on_lookup_result(id, result, "new_current_lookup"); + self.on_lookup_result(id, result, "new_current_lookup", cx); self.update_metrics(); self.single_block_lookups.contains_key(&id) } @@ -433,7 +435,7 @@ impl BlockLookups { return; }; let result = lookup.on_block_download_response(id.req_id, response, cx); - self.on_lookup_result(id.lookup_id, result, "block_download_response"); + self.on_lookup_result(id.lookup_id, result, "block_download_response", cx); } pub fn on_custody_download_response( @@ -447,7 +449,7 @@ impl BlockLookups { return; }; let result = lookup.on_custody_download_response(id.req_id, response, cx); - self.on_lookup_result(id.lookup_id, result, "custody_download_response"); + self.on_lookup_result(id.lookup_id, result, "custody_download_response", cx); } pub fn on_payload_download_response( @@ -464,7 +466,7 @@ impl BlockLookups { return; }; let result = lookup.on_payload_download_response(id.req_id, response, cx); - self.on_lookup_result(id.lookup_id, result, "payload_download_response"); + self.on_lookup_result(id.lookup_id, result, "payload_download_response", cx); } /* Error responses */ @@ -502,73 +504,39 @@ impl BlockLookups { ); let lookup_result = match process_type { - BlockProcessType::SingleBlock { .. } => lookup.on_block_processing_result(&result, cx), - BlockProcessType::SingleCustodyColumn(_) => { - lookup.on_data_processing_result(&result, cx) - } - BlockProcessType::SinglePayloadEnvelope(_) => { - lookup.on_payload_processing_result(&result, cx) - } - }; - - match &result { - BlockProcessingResult::Imported(..) => { - // Some component got imported potentially continue - if lookup.is_complete() { - if self.single_block_lookups.remove(&id).is_some() { - debug!(?block_root, id, "Dropping completed lookup"); - metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED); - self.metrics.completed_lookups += 1; - // Block imported, continue the requests of pending child blocks - self.continue_child_lookups(block_root, ImportedAction::LookupComplete, cx); - self.update_metrics(); - } else { - debug!(id, "Attempting to drop non-existent lookup"); - } - } else if matches!(process_type, BlockProcessType::SingleBlock { .. }) + BlockProcessType::SingleBlock { .. } => { + // Update the result of the lookup first, here we may start the download of Gloas + // payload, which may error. + let lookup_result = lookup.on_block_processing_result(result.clone(), cx); + let lookup_is_awaiting_event = lookup.is_awaiting_event(); + // Then, as a side-effect continue the EMPTY children of this lookup. Only if the + // block just imported which ensures we just do it once per lookup. + if let BlockProcessingResult::Imported(..) = result && let Some(bid_block_hash) = lookup.peek_downloaded_bid_block_hash() { - // For post-Gloas blocks, if the block just became imported attempt to make - // progress on its EMPTY children. Then, if there are no FULL children, remove - // the lookup. self.continue_child_lookups( block_root, - ImportedAction::GloasBlockComplete(bid_block_hash), + ImportedParent::OnlyGloasBlock(bid_block_hash), cx, ); - if !self.has_any_awaiting_children(block_root) { - self.single_block_lookups.remove(&id); - metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED); - self.metrics.completed_lookups += 1; - debug!( - ?block_root, - id, "Dropping completed lookup after block import" - ); - } - self.update_metrics(); + } + // Then if this lookup happens to have only empty children we can remove it now. We + // must make sure that no other lookup is awaiting this one, and that no requests + // are on-going. + if !lookup_is_awaiting_event && !self.has_any_awaiting_children(block_root) { + Ok(LookupResult::Completed) + } else { + lookup_result } } - BlockProcessingResult::ParentUnknown { - parent_root, - parent_block_hash, - } => { - // Parent unknown error, create parent lookup - let peers = lookup.all_peers(); - if !self.search_parent_of_child( - *parent_root, - &PeerType::new(*parent_block_hash), - block_root, - &peers, - cx, - ) { - self.drop_lookup_and_children(id, "Failed"); - self.update_metrics(); - } + BlockProcessType::SingleCustodyColumn(_) => { + lookup.on_data_processing_result(result, cx) } - BlockProcessingResult::Error { .. } => {} - } - - self.on_lookup_result(id, lookup_result, "processing_result"); + BlockProcessType::SinglePayloadEnvelope(_) => { + lookup.on_payload_processing_result(result, cx) + } + }; + self.on_lookup_result(id, lookup_result, "processing_result", cx); } pub fn has_any_awaiting_children(&self, block_root: Hash256) -> bool { @@ -581,15 +549,15 @@ impl BlockLookups { pub fn continue_child_lookups( &mut self, parent_root: Hash256, - import_action: ImportedAction, + imported_parent: ImportedParent, cx: &mut SyncNetworkContext, ) { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.maybe_resolve_awaiting_parent(parent_root, import_action) { + if lookup.maybe_resolve_awaiting_parent(parent_root, imported_parent) { debug!( - ?import_action, + ?imported_parent, id, block_root = ?lookup.block_root(), "Continuing child lookup" @@ -600,7 +568,7 @@ impl BlockLookups { } for (id, result) in lookup_results { - self.on_lookup_result(id, result, "continue_child_lookups"); + self.on_lookup_result(id, result, "continue_child_lookups", cx); } } @@ -637,11 +605,51 @@ impl BlockLookups { fn on_lookup_result( &mut self, id: SingleLookupId, - result: Result<(), LookupRequestError>, + result: Result, source: &str, + cx: &mut SyncNetworkContext, ) { match result { - Ok(_) => {} + Ok(LookupResult::Pending) => {} + Ok(LookupResult::ParentUnknown { + parent_root, + parent_block_hash, + block_root, + peers, + }) => { + if self.search_parent_of_child( + parent_root, + &PeerType::new(parent_block_hash), + block_root, + &peers, + cx, + ) { + // + } else { + self.drop_lookup_and_children(id, "Failed"); + self.update_metrics(); + } + } + Ok(LookupResult::Completed) => { + if let Some(lookup) = self.single_block_lookups.remove(&id) { + debug!( + block = ?lookup.block_root(), + id, + "Dropping completed lookup" + ); + metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED); + self.metrics.completed_lookups += 1; + // Block imported, continue the requests of pending child blocks + self.continue_child_lookups( + lookup.block_root(), + ImportedParent::LookupComplete, + cx, + ); + self.update_metrics(); + } else { + debug!(id, "Attempting to drop non-existent lookup"); + } + } Err(error) => { debug!(id, source, ?error, "Dropping lookup on request error"); self.drop_lookup_and_children(id, error.into()); @@ -836,7 +844,7 @@ impl BlockLookups { // pruned with `drop_lookups_without_peers` because it has peers. This is rare corner // case, but it can result in stuck lookups. let result = lookup.continue_requests(cx); - self.on_lookup_result(lookup_id, result, "add_peers"); + self.on_lookup_result(lookup_id, result, "add_peers", cx); Ok(()) } else { Ok(()) diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index ff17db4322..59d025b5b3 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -24,6 +24,22 @@ use types::{ SignedExecutionPayloadEnvelope, Slot, }; +// Dedicated enum for LookupResult to force its usage +#[must_use = "LookupResult must be handled with on_lookup_result"] +pub enum LookupResult { + /// Lookup completed successfully + Completed, + /// Lookup is expecting some future event from the network + Pending, + /// Block's parent is not known to fork-choice, a parent lookup is needed + ParentUnknown { + parent_root: Hash256, + parent_block_hash: Option, + block_root: Hash256, + peers: Vec, + }, +} + #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupRequestError { /// Too many failed attempts @@ -164,9 +180,9 @@ impl From<&AwaitingParent> for PeerType { } #[derive(Debug, Clone, Copy)] -pub enum ImportedAction { +pub enum ImportedParent { LookupComplete, - GloasBlockComplete(ExecutionBlockHash), + OnlyGloasBlock(ExecutionBlockHash), } #[derive(Educe)] @@ -283,18 +299,18 @@ impl SingleBlockLookup { /// processing. pub fn maybe_resolve_awaiting_parent( &mut self, - block_root: Hash256, - action: ImportedAction, + parent_root: Hash256, + imported_parent: ImportedParent, ) -> bool { let Some(awaiting_parent) = self.awaiting_parent else { return false; }; - if awaiting_parent.parent_root() != block_root { + if awaiting_parent.parent_root() != parent_root { return false; } - let should_resolve = match action { - ImportedAction::LookupComplete => true, - ImportedAction::GloasBlockComplete(bid_block_hash) => { + let should_resolve = match imported_parent { + ImportedParent::LookupComplete => true, + ImportedParent::OnlyGloasBlock(bid_block_hash) => { if let Some(parent_block_hash) = awaiting_parent.parent_block_hash { // This lookup is the execution child of `parent_execution_hash`. If the // parent hash the same `bid_block_hash` this is FULL child and we must wait @@ -354,18 +370,12 @@ impl SingleBlockLookup { } } - pub fn is_complete(&self) -> bool { - self.block_request.is_complete() - && self.data_request.is_complete() - && self.payload_request.is_complete() - } - /// Makes progress on all requests of this lookup. Any error is not recoverable and must result /// in dropping the lookup. May mark the lookup as completed. pub fn continue_requests( &mut self, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result { let _guard = self.span.clone().entered(); // === Block request === @@ -467,7 +477,17 @@ impl SingleBlockLookup { } } - Ok(()) + // If all components of this lookup are already processed, there will be no future events + // that can make progress so it must be dropped. Consider the lookup completed. + // This case can happen if we receive the components from gossip during a retry. + if self.block_request.is_complete() + && self.data_request.is_complete() + && self.payload_request.is_complete() + { + return Ok(LookupResult::Completed); + } + + Ok(LookupResult::Pending) } /// Returns the peers that should serve this block's data columns and payload envelope. For FULL @@ -492,9 +512,9 @@ impl SingleBlockLookup { /// Handle block processing result. Advances the lookup state machine. pub fn on_block_processing_result( &mut self, - result: &BlockProcessingResult, + result: BlockProcessingResult, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result { match result { BlockProcessingResult::Imported(_fully_imported, _info) => { self.block_request.state.on_processing_success()?; @@ -509,14 +529,20 @@ impl SingleBlockLookup { // the parent lookup completes. self.block_request.state.revert_to_awaiting_processing()?; self.set_awaiting_parent(AwaitingParent { - parent_root: *parent_root, - parent_block_hash: *parent_block_hash, + parent_root, + parent_block_hash, + }); + return Ok(LookupResult::ParentUnknown { + parent_root, + parent_block_hash, + block_root: self.block_root, + peers: self.all_peers(), }); } BlockProcessingResult::Error { penalty, .. } => { let peers = self.block_request.state.on_processing_failure()?; if let Some((action, whom, msg)) = penalty { - whom.apply(*action, &peers, msg, cx); + whom.apply(action, &peers, msg, cx); } } } @@ -526,9 +552,9 @@ impl SingleBlockLookup { /// Handle data processing result pub fn on_data_processing_result( &mut self, - result: &BlockProcessingResult, + result: BlockProcessingResult, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result { let DataRequest::Request { state, .. } = &mut self.data_request else { return Err(LookupRequestError::BadState("no data_request".to_owned())); }; @@ -545,7 +571,7 @@ impl SingleBlockLookup { BlockProcessingResult::Error { penalty, .. } => { let peers = state.on_processing_failure()?; if let Some((action, whom, msg)) = penalty { - whom.apply(*action, &peers, msg, cx); + whom.apply(action, &peers, msg, cx); } } } @@ -555,9 +581,9 @@ impl SingleBlockLookup { /// Handle payload envelope processing result (Gloas only). pub fn on_payload_processing_result( &mut self, - result: &BlockProcessingResult, + result: BlockProcessingResult, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result { let PayloadRequest::Request { state, .. } = &mut self.payload_request else { return Err(LookupRequestError::BadState( "no payload_request".to_owned(), @@ -576,7 +602,7 @@ impl SingleBlockLookup { BlockProcessingResult::Error { penalty, .. } => { let peers = state.on_processing_failure()?; if let Some((action, whom, msg)) = penalty { - whom.apply(*action, &peers, msg, cx); + whom.apply(action, &peers, msg, cx); } } } @@ -589,7 +615,7 @@ impl SingleBlockLookup { req_id: ReqId, result: BlockDownloadResponse, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result { self.block_request .state .on_download_response(req_id, result)?; @@ -602,7 +628,7 @@ impl SingleBlockLookup { req_id: ReqId, result: CustodyDownloadResponse, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result { let DataRequest::Request { state, .. } = &mut self.data_request else { return Err(LookupRequestError::BadState("no data_request".to_owned())); }; @@ -617,7 +643,7 @@ impl SingleBlockLookup { req_id: ReqId, result: PayloadDownloadResponse, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result { let PayloadRequest::Request { state, .. } = &mut self.payload_request else { return Err(LookupRequestError::BadState( "no payload_request".to_owned(),