From a0a62ea3e14385d529af2be19bf2487e58db2290 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 11 Oct 2024 02:44:18 +0300 Subject: [PATCH] Prevent sync lookups from reverting to awaiting block (#6443) * Prevent sync lookups from reverting to awaiting block * Remove stale comment --- .../network/src/sync/block_lookups/common.rs | 38 +++--- .../network/src/sync/block_lookups/mod.rs | 8 +- .../sync/block_lookups/single_block_lookup.rs | 117 +++++++++++++++--- .../network/src/sync/network_context.rs | 67 +--------- 4 files changed, 128 insertions(+), 102 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index c7c043f53f..5e336d9c38 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; use types::{DataColumnSidecarList, SignedBeaconBlock}; -use super::single_block_lookup::DownloadResult; +use super::single_block_lookup::{ComponentRequests, DownloadResult}; use super::SingleLookupId; #[derive(Debug, Copy, Clone)] @@ -42,7 +42,7 @@ pub trait RequestState { &self, id: Id, peer_id: PeerId, - downloaded_block: Option>>, + expected_blobs: usize, cx: &mut SyncNetworkContext, ) -> Result; @@ -61,7 +61,7 @@ pub trait RequestState { fn response_type() -> ResponseType; /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self; + fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str>; /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. fn get_state(&self) -> &SingleLookupRequestState; @@ -77,7 +77,7 @@ impl RequestState for BlockRequestState { &self, id: SingleLookupId, peer_id: PeerId, - _: Option>>, + _: usize, cx: &mut SyncNetworkContext, ) -> Result { cx.block_lookup_request(id, peer_id, self.requested_block_root) @@ -107,8 +107,8 @@ impl RequestState for BlockRequestState { fn response_type() -> ResponseType { ResponseType::Block } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.block_request_state + fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { + Ok(&mut request.block_request_state) } fn get_state(&self) -> &SingleLookupRequestState { &self.state @@ -125,10 +125,10 @@ impl RequestState for BlobRequestState { &self, id: Id, peer_id: PeerId, - downloaded_block: Option>>, + expected_blobs: usize, cx: &mut SyncNetworkContext, ) -> Result { - cx.blob_lookup_request(id, peer_id, self.block_root, downloaded_block) + cx.blob_lookup_request(id, peer_id, self.block_root, expected_blobs) .map_err(LookupRequestError::SendFailedNetwork) } @@ -150,8 +150,13 @@ impl RequestState for BlobRequestState { fn response_type() -> ResponseType { ResponseType::Blob } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.blob_request_state + fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { + match &mut request.component_requests { + ComponentRequests::WaitingForBlock => Err("waiting for block"), + ComponentRequests::ActiveBlobRequest(request, _) => Ok(request), + ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"), + ComponentRequests::NotNeeded { .. } => Err("not needed"), + } } fn get_state(&self) -> &SingleLookupRequestState { &self.state @@ -169,10 +174,10 @@ impl RequestState for CustodyRequestState { id: Id, // TODO(das): consider selecting peers that have custody but are in this set _peer_id: PeerId, - downloaded_block: Option>>, + _: usize, cx: &mut SyncNetworkContext, ) -> Result { - cx.custody_lookup_request(id, self.block_root, downloaded_block) + cx.custody_lookup_request(id, self.block_root) .map_err(LookupRequestError::SendFailedNetwork) } @@ -200,8 +205,13 @@ impl RequestState for CustodyRequestState { fn response_type() -> ResponseType { ResponseType::CustodyColumn } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.custody_request_state + fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { + match &mut request.component_requests { + ComponentRequests::WaitingForBlock => Err("waiting for block"), + ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"), + ComponentRequests::ActiveCustodyRequest(request) => Ok(request), + ComponentRequests::NotNeeded { .. } => Err("not needed"), + } } fn get_state(&self) -> &SingleLookupRequestState { &self.state diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a89f533ecc..f5e68d1512 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -450,7 +450,9 @@ impl BlockLookups { }; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup).get_state_mut(); + let request_state = R::request_state_mut(lookup) + .map_err(|e| LookupRequestError::BadState(e.to_owned()))? + .get_state_mut(); match response { Ok((response, peer_group, seen_timestamp)) => { @@ -545,7 +547,9 @@ impl BlockLookups { }; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup).get_state_mut(); + let request_state = R::request_state_mut(lookup) + .map_err(|e| LookupRequestError::BadState(e.to_owned()))? + .get_state_mut(); debug!( self.log, diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 4e7268a72a..d701cbbb8d 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -4,7 +4,7 @@ use crate::sync::network_context::{ LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, SendErrorProcessor, SyncNetworkContext, }; -use beacon_chain::BeaconChainTypes; +use beacon_chain::{BeaconChainTypes, BlockProcessStatus}; use derivative::Derivative; use lighthouse_network::service::api_types::Id; use rand::seq::IteratorRandom; @@ -62,8 +62,7 @@ pub enum LookupRequestError { pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, - pub blob_request_state: BlobRequestState, - pub custody_request_state: CustodyRequestState, + pub component_requests: ComponentRequests, /// Peers that claim to have imported this set of block components #[derivative(Debug(format_with = "fmt_peer_set_as_len"))] peers: HashSet, @@ -72,6 +71,16 @@ pub struct SingleBlockLookup { created: Instant, } +#[derive(Debug)] +pub(crate) enum ComponentRequests { + WaitingForBlock, + ActiveBlobRequest(BlobRequestState, usize), + ActiveCustodyRequest(CustodyRequestState), + // When printing in debug this state display the reason why it's not needed + #[allow(dead_code)] + NotNeeded(&'static str), +} + impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, @@ -82,8 +91,7 @@ impl SingleBlockLookup { Self { id, block_request_state: BlockRequestState::new(requested_block_root), - blob_request_state: BlobRequestState::new(requested_block_root), - custody_request_state: CustodyRequestState::new(requested_block_root), + component_requests: ComponentRequests::WaitingForBlock, peers: HashSet::from_iter(peers.iter().copied()), block_root: requested_block_root, awaiting_parent, @@ -150,16 +158,28 @@ impl SingleBlockLookup { /// Returns true if the block has already been downloaded. pub fn all_components_processed(&self) -> bool { self.block_request_state.state.is_processed() - && self.blob_request_state.state.is_processed() - && self.custody_request_state.state.is_processed() + && match &self.component_requests { + ComponentRequests::WaitingForBlock => false, + ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(), + ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(), + ComponentRequests::NotNeeded { .. } => true, + } } /// Returns true if this request is expecting some event to make progress pub fn is_awaiting_event(&self) -> bool { self.awaiting_parent.is_some() || self.block_request_state.state.is_awaiting_event() - || self.blob_request_state.state.is_awaiting_event() - || self.custody_request_state.state.is_awaiting_event() + || match &self.component_requests { + ComponentRequests::WaitingForBlock => true, + ComponentRequests::ActiveBlobRequest(request, _) => { + request.state.is_awaiting_event() + } + ComponentRequests::ActiveCustodyRequest(request) => { + request.state.is_awaiting_event() + } + ComponentRequests::NotNeeded { .. } => false, + } } /// Makes progress on all requests of this lookup. Any error is not recoverable and must result @@ -169,9 +189,66 @@ impl SingleBlockLookup { cx: &mut SyncNetworkContext, ) -> Result { // TODO: Check what's necessary to download, specially for blobs - self.continue_request::>(cx)?; - self.continue_request::>(cx)?; - self.continue_request::>(cx)?; + self.continue_request::>(cx, 0)?; + + if let ComponentRequests::WaitingForBlock = self.component_requests { + let downloaded_block = self + .block_request_state + .state + .peek_downloaded_data() + .cloned(); + + if let Some(block) = downloaded_block.or_else(|| { + // If the block is already being processed or fully validated, retrieve how many blobs + // it expects. Consider any stage of the block. If the block root has been validated, we + // can assert that this is the correct value of `blob_kzg_commitments_count`. + match cx.chain.get_block_process_status(&self.block_root) { + BlockProcessStatus::Unknown => None, + BlockProcessStatus::NotValidated(block) + | BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()), + } + }) { + let expected_blobs = block.num_expected_blobs(); + let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + if expected_blobs == 0 { + self.component_requests = ComponentRequests::NotNeeded("no data"); + } + if cx.chain.should_fetch_blobs(block_epoch) { + self.component_requests = ComponentRequests::ActiveBlobRequest( + BlobRequestState::new(self.block_root), + expected_blobs, + ); + } else if cx.chain.should_fetch_custody_columns(block_epoch) { + self.component_requests = ComponentRequests::ActiveCustodyRequest( + CustodyRequestState::new(self.block_root), + ); + } else { + self.component_requests = ComponentRequests::NotNeeded("outside da window"); + } + } else { + // Wait to download the block before downloading blobs. Then we can be sure that the + // block has data, so there's no need to do "blind" requests for all possible blobs and + // latter handle the case where if the peer sent no blobs, penalize. + // + // Lookup sync event safety: Reaching this code means that a block is not in any pre-import + // cache nor in the request state of this lookup. Therefore, the block must either: (1) not + // be downloaded yet or (2) the block is already imported into the fork-choice. + // In case (1) the lookup must either successfully download the block or get dropped. + // In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported` + // and get dropped as completed. + } + } + + match &self.component_requests { + ComponentRequests::WaitingForBlock => {} // do nothing + ComponentRequests::ActiveBlobRequest(_, expected_blobs) => { + self.continue_request::>(cx, *expected_blobs)? + } + ComponentRequests::ActiveCustodyRequest(_) => { + self.continue_request::>(cx, 0)? + } + ComponentRequests::NotNeeded { .. } => {} // do nothing + } // If all components of this lookup are already processed, there will be no future events // that can make progress so it must be dropped. Consider the lookup completed. @@ -187,15 +264,12 @@ impl SingleBlockLookup { fn continue_request>( &mut self, cx: &mut SyncNetworkContext, + expected_blobs: usize, ) -> Result<(), LookupRequestError> { let id = self.id; let awaiting_parent = self.awaiting_parent.is_some(); - let downloaded_block = self - .block_request_state - .state - .peek_downloaded_data() - .cloned(); - let request = R::request_state_mut(self); + let request = + R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?; // Attempt to progress awaiting downloads if request.get_state().is_awaiting_download() { @@ -214,13 +288,16 @@ impl SingleBlockLookup { // not receive any new peers for some time it will be dropped. If it receives a new // peer it must attempt to make progress. R::request_state_mut(self) + .map_err(|e| LookupRequestError::BadState(e.to_owned()))? .get_state_mut() .update_awaiting_download_status("no peers"); return Ok(()); }; - let request = R::request_state_mut(self); - match request.make_request(id, peer_id, downloaded_block, cx)? { + let request = R::request_state_mut(self) + .map_err(|e| LookupRequestError::BadState(e.to_owned()))?; + + match request.make_request(id, peer_id, expected_blobs, cx)? { LookupRequestResult::RequestSent(req_id) => { // Lookup sync event safety: If make_request returns `RequestSent`, we are // guaranteed that `BlockLookups::on_download_response` will be called exactly diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b67c0bf2dd..9f9a189817 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -632,45 +632,8 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, peer_id: PeerId, block_root: Hash256, - downloaded_block: Option>>, + expected_blobs: usize, ) -> Result { - let Some(block) = downloaded_block.or_else(|| { - // If the block is already being processed or fully validated, retrieve how many blobs - // it expects. Consider any stage of the block. If the block root has been validated, we - // can assert that this is the correct value of `blob_kzg_commitments_count`. - match self.chain.get_block_process_status(&block_root) { - BlockProcessStatus::Unknown => None, - BlockProcessStatus::NotValidated(block) - | BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()), - } - }) else { - // Wait to download the block before downloading blobs. Then we can be sure that the - // block has data, so there's no need to do "blind" requests for all possible blobs and - // latter handle the case where if the peer sent no blobs, penalize. - // - if `downloaded_block_expected_blobs` is Some = block is downloading or processing. - // - if `num_expected_blobs` returns Some = block is processed. - // - // Lookup sync event safety: Reaching this code means that a block is not in any pre-import - // cache nor in the request state of this lookup. Therefore, the block must either: (1) not - // be downloaded yet or (2) the block is already imported into the fork-choice. - // In case (1) the lookup must either successfully download the block or get dropped. - // In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported` - // and get dropped as completed. - return Ok(LookupRequestResult::Pending("waiting for block download")); - }; - let expected_blobs = block.num_expected_blobs(); - let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - - // Check if we are in deneb, before peerdas and inside da window - if !self.chain.should_fetch_blobs(block_epoch) { - return Ok(LookupRequestResult::NoRequestNeeded("blobs not required")); - } - - // No data required for this block - if expected_blobs == 0 { - return Ok(LookupRequestResult::NoRequestNeeded("no data")); - } - let imported_blob_indexes = self .chain .data_availability_checker @@ -760,35 +723,7 @@ impl SyncNetworkContext { &mut self, lookup_id: SingleLookupId, block_root: Hash256, - downloaded_block: Option>>, ) -> Result { - let Some(block) = - downloaded_block.or_else(|| match self.chain.get_block_process_status(&block_root) { - BlockProcessStatus::Unknown => None, - BlockProcessStatus::NotValidated(block) - | BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()), - }) - else { - // Wait to download the block before downloading columns. Then we can be sure that the - // block has data, so there's no need to do "blind" requests for all possible columns and - // latter handle the case where if the peer sent no columns, penalize. - // - if `downloaded_block_expected_blobs` is Some = block is downloading or processing. - // - if `num_expected_blobs` returns Some = block is processed. - return Ok(LookupRequestResult::Pending("waiting for block download")); - }; - let expected_blobs = block.num_expected_blobs(); - let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - - // Check if we are into peerdas and inside da window - if !self.chain.should_fetch_custody_columns(block_epoch) { - return Ok(LookupRequestResult::NoRequestNeeded("columns not required")); - } - - // No data required for this block - if expected_blobs == 0 { - return Ok(LookupRequestResult::NoRequestNeeded("no data")); - } - let custody_indexes_imported = self .chain .data_availability_checker