mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 18:21:45 +00:00
Prevent sync lookups from reverting to awaiting block (#6443)
* Prevent sync lookups from reverting to awaiting block * Remove stale comment
This commit is contained in:
@@ -13,7 +13,7 @@ use std::sync::Arc;
|
||||
use types::blob_sidecar::FixedBlobSidecarList;
|
||||
use types::{DataColumnSidecarList, SignedBeaconBlock};
|
||||
|
||||
use super::single_block_lookup::DownloadResult;
|
||||
use super::single_block_lookup::{ComponentRequests, DownloadResult};
|
||||
use super::SingleLookupId;
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
@@ -42,7 +42,7 @@ pub trait RequestState<T: BeaconChainTypes> {
|
||||
&self,
|
||||
id: Id,
|
||||
peer_id: PeerId,
|
||||
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
expected_blobs: usize,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupRequestResult, LookupRequestError>;
|
||||
|
||||
@@ -61,7 +61,7 @@ pub trait RequestState<T: BeaconChainTypes> {
|
||||
fn response_type() -> ResponseType;
|
||||
|
||||
/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self;
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str>;
|
||||
|
||||
/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
|
||||
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType>;
|
||||
@@ -77,7 +77,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
|
||||
&self,
|
||||
id: SingleLookupId,
|
||||
peer_id: PeerId,
|
||||
_: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
_: usize,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupRequestResult, LookupRequestError> {
|
||||
cx.block_lookup_request(id, peer_id, self.requested_block_root)
|
||||
@@ -107,8 +107,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
|
||||
fn response_type() -> ResponseType {
|
||||
ResponseType::Block
|
||||
}
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
|
||||
&mut request.block_request_state
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||
Ok(&mut request.block_request_state)
|
||||
}
|
||||
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&self.state
|
||||
@@ -125,10 +125,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
|
||||
&self,
|
||||
id: Id,
|
||||
peer_id: PeerId,
|
||||
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
expected_blobs: usize,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupRequestResult, LookupRequestError> {
|
||||
cx.blob_lookup_request(id, peer_id, self.block_root, downloaded_block)
|
||||
cx.blob_lookup_request(id, peer_id, self.block_root, expected_blobs)
|
||||
.map_err(LookupRequestError::SendFailedNetwork)
|
||||
}
|
||||
|
||||
@@ -150,8 +150,13 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
|
||||
fn response_type() -> ResponseType {
|
||||
ResponseType::Blob
|
||||
}
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
|
||||
&mut request.blob_request_state
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||
match &mut request.component_requests {
|
||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
|
||||
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
|
||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||
}
|
||||
}
|
||||
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&self.state
|
||||
@@ -169,10 +174,10 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
||||
id: Id,
|
||||
// TODO(das): consider selecting peers that have custody but are in this set
|
||||
_peer_id: PeerId,
|
||||
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
_: usize,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupRequestResult, LookupRequestError> {
|
||||
cx.custody_lookup_request(id, self.block_root, downloaded_block)
|
||||
cx.custody_lookup_request(id, self.block_root)
|
||||
.map_err(LookupRequestError::SendFailedNetwork)
|
||||
}
|
||||
|
||||
@@ -200,8 +205,13 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
||||
fn response_type() -> ResponseType {
|
||||
ResponseType::CustodyColumn
|
||||
}
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
|
||||
&mut request.custody_request_state
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||
match &mut request.component_requests {
|
||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
|
||||
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
|
||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||
}
|
||||
}
|
||||
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&self.state
|
||||
|
||||
@@ -450,7 +450,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
};
|
||||
|
||||
let block_root = lookup.block_root();
|
||||
let request_state = R::request_state_mut(lookup).get_state_mut();
|
||||
let request_state = R::request_state_mut(lookup)
|
||||
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
|
||||
.get_state_mut();
|
||||
|
||||
match response {
|
||||
Ok((response, peer_group, seen_timestamp)) => {
|
||||
@@ -545,7 +547,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
};
|
||||
|
||||
let block_root = lookup.block_root();
|
||||
let request_state = R::request_state_mut(lookup).get_state_mut();
|
||||
let request_state = R::request_state_mut(lookup)
|
||||
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
|
||||
.get_state_mut();
|
||||
|
||||
debug!(
|
||||
self.log,
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::sync::network_context::{
|
||||
LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, SendErrorProcessor,
|
||||
SyncNetworkContext,
|
||||
};
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
|
||||
use derivative::Derivative;
|
||||
use lighthouse_network::service::api_types::Id;
|
||||
use rand::seq::IteratorRandom;
|
||||
@@ -62,8 +62,7 @@ pub enum LookupRequestError {
|
||||
pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||
pub id: Id,
|
||||
pub block_request_state: BlockRequestState<T::EthSpec>,
|
||||
pub blob_request_state: BlobRequestState<T::EthSpec>,
|
||||
pub custody_request_state: CustodyRequestState<T::EthSpec>,
|
||||
pub component_requests: ComponentRequests<T::EthSpec>,
|
||||
/// Peers that claim to have imported this set of block components
|
||||
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
|
||||
peers: HashSet<PeerId>,
|
||||
@@ -72,6 +71,16 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||
created: Instant,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ComponentRequests<E: EthSpec> {
|
||||
WaitingForBlock,
|
||||
ActiveBlobRequest(BlobRequestState<E>, usize),
|
||||
ActiveCustodyRequest(CustodyRequestState<E>),
|
||||
// When printing in debug this state display the reason why it's not needed
|
||||
#[allow(dead_code)]
|
||||
NotNeeded(&'static str),
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
pub fn new(
|
||||
requested_block_root: Hash256,
|
||||
@@ -82,8 +91,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
Self {
|
||||
id,
|
||||
block_request_state: BlockRequestState::new(requested_block_root),
|
||||
blob_request_state: BlobRequestState::new(requested_block_root),
|
||||
custody_request_state: CustodyRequestState::new(requested_block_root),
|
||||
component_requests: ComponentRequests::WaitingForBlock,
|
||||
peers: HashSet::from_iter(peers.iter().copied()),
|
||||
block_root: requested_block_root,
|
||||
awaiting_parent,
|
||||
@@ -150,16 +158,28 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
/// Returns true if the block has already been downloaded.
|
||||
pub fn all_components_processed(&self) -> bool {
|
||||
self.block_request_state.state.is_processed()
|
||||
&& self.blob_request_state.state.is_processed()
|
||||
&& self.custody_request_state.state.is_processed()
|
||||
&& match &self.component_requests {
|
||||
ComponentRequests::WaitingForBlock => false,
|
||||
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
|
||||
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
|
||||
ComponentRequests::NotNeeded { .. } => true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if this request is expecting some event to make progress
|
||||
pub fn is_awaiting_event(&self) -> bool {
|
||||
self.awaiting_parent.is_some()
|
||||
|| self.block_request_state.state.is_awaiting_event()
|
||||
|| self.blob_request_state.state.is_awaiting_event()
|
||||
|| self.custody_request_state.state.is_awaiting_event()
|
||||
|| match &self.component_requests {
|
||||
ComponentRequests::WaitingForBlock => true,
|
||||
ComponentRequests::ActiveBlobRequest(request, _) => {
|
||||
request.state.is_awaiting_event()
|
||||
}
|
||||
ComponentRequests::ActiveCustodyRequest(request) => {
|
||||
request.state.is_awaiting_event()
|
||||
}
|
||||
ComponentRequests::NotNeeded { .. } => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
|
||||
@@ -169,9 +189,66 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
// TODO: Check what's necessary to download, specially for blobs
|
||||
self.continue_request::<BlockRequestState<T::EthSpec>>(cx)?;
|
||||
self.continue_request::<BlobRequestState<T::EthSpec>>(cx)?;
|
||||
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx)?;
|
||||
self.continue_request::<BlockRequestState<T::EthSpec>>(cx, 0)?;
|
||||
|
||||
if let ComponentRequests::WaitingForBlock = self.component_requests {
|
||||
let downloaded_block = self
|
||||
.block_request_state
|
||||
.state
|
||||
.peek_downloaded_data()
|
||||
.cloned();
|
||||
|
||||
if let Some(block) = downloaded_block.or_else(|| {
|
||||
// If the block is already being processed or fully validated, retrieve how many blobs
|
||||
// it expects. Consider any stage of the block. If the block root has been validated, we
|
||||
// can assert that this is the correct value of `blob_kzg_commitments_count`.
|
||||
match cx.chain.get_block_process_status(&self.block_root) {
|
||||
BlockProcessStatus::Unknown => None,
|
||||
BlockProcessStatus::NotValidated(block)
|
||||
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
|
||||
}
|
||||
}) {
|
||||
let expected_blobs = block.num_expected_blobs();
|
||||
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
|
||||
if expected_blobs == 0 {
|
||||
self.component_requests = ComponentRequests::NotNeeded("no data");
|
||||
}
|
||||
if cx.chain.should_fetch_blobs(block_epoch) {
|
||||
self.component_requests = ComponentRequests::ActiveBlobRequest(
|
||||
BlobRequestState::new(self.block_root),
|
||||
expected_blobs,
|
||||
);
|
||||
} else if cx.chain.should_fetch_custody_columns(block_epoch) {
|
||||
self.component_requests = ComponentRequests::ActiveCustodyRequest(
|
||||
CustodyRequestState::new(self.block_root),
|
||||
);
|
||||
} else {
|
||||
self.component_requests = ComponentRequests::NotNeeded("outside da window");
|
||||
}
|
||||
} else {
|
||||
// Wait to download the block before downloading blobs. Then we can be sure that the
|
||||
// block has data, so there's no need to do "blind" requests for all possible blobs and
|
||||
// latter handle the case where if the peer sent no blobs, penalize.
|
||||
//
|
||||
// Lookup sync event safety: Reaching this code means that a block is not in any pre-import
|
||||
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
|
||||
// be downloaded yet or (2) the block is already imported into the fork-choice.
|
||||
// In case (1) the lookup must either successfully download the block or get dropped.
|
||||
// In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported`
|
||||
// and get dropped as completed.
|
||||
}
|
||||
}
|
||||
|
||||
match &self.component_requests {
|
||||
ComponentRequests::WaitingForBlock => {} // do nothing
|
||||
ComponentRequests::ActiveBlobRequest(_, expected_blobs) => {
|
||||
self.continue_request::<BlobRequestState<T::EthSpec>>(cx, *expected_blobs)?
|
||||
}
|
||||
ComponentRequests::ActiveCustodyRequest(_) => {
|
||||
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
|
||||
}
|
||||
ComponentRequests::NotNeeded { .. } => {} // do nothing
|
||||
}
|
||||
|
||||
// If all components of this lookup are already processed, there will be no future events
|
||||
// that can make progress so it must be dropped. Consider the lookup completed.
|
||||
@@ -187,15 +264,12 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
fn continue_request<R: RequestState<T>>(
|
||||
&mut self,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
expected_blobs: usize,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let id = self.id;
|
||||
let awaiting_parent = self.awaiting_parent.is_some();
|
||||
let downloaded_block = self
|
||||
.block_request_state
|
||||
.state
|
||||
.peek_downloaded_data()
|
||||
.cloned();
|
||||
let request = R::request_state_mut(self);
|
||||
let request =
|
||||
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
||||
|
||||
// Attempt to progress awaiting downloads
|
||||
if request.get_state().is_awaiting_download() {
|
||||
@@ -214,13 +288,16 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
// not receive any new peers for some time it will be dropped. If it receives a new
|
||||
// peer it must attempt to make progress.
|
||||
R::request_state_mut(self)
|
||||
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
|
||||
.get_state_mut()
|
||||
.update_awaiting_download_status("no peers");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let request = R::request_state_mut(self);
|
||||
match request.make_request(id, peer_id, downloaded_block, cx)? {
|
||||
let request = R::request_state_mut(self)
|
||||
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
||||
|
||||
match request.make_request(id, peer_id, expected_blobs, cx)? {
|
||||
LookupRequestResult::RequestSent(req_id) => {
|
||||
// Lookup sync event safety: If make_request returns `RequestSent`, we are
|
||||
// guaranteed that `BlockLookups::on_download_response` will be called exactly
|
||||
|
||||
@@ -632,45 +632,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
lookup_id: SingleLookupId,
|
||||
peer_id: PeerId,
|
||||
block_root: Hash256,
|
||||
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
expected_blobs: usize,
|
||||
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||
let Some(block) = downloaded_block.or_else(|| {
|
||||
// If the block is already being processed or fully validated, retrieve how many blobs
|
||||
// it expects. Consider any stage of the block. If the block root has been validated, we
|
||||
// can assert that this is the correct value of `blob_kzg_commitments_count`.
|
||||
match self.chain.get_block_process_status(&block_root) {
|
||||
BlockProcessStatus::Unknown => None,
|
||||
BlockProcessStatus::NotValidated(block)
|
||||
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
|
||||
}
|
||||
}) else {
|
||||
// Wait to download the block before downloading blobs. Then we can be sure that the
|
||||
// block has data, so there's no need to do "blind" requests for all possible blobs and
|
||||
// latter handle the case where if the peer sent no blobs, penalize.
|
||||
// - if `downloaded_block_expected_blobs` is Some = block is downloading or processing.
|
||||
// - if `num_expected_blobs` returns Some = block is processed.
|
||||
//
|
||||
// Lookup sync event safety: Reaching this code means that a block is not in any pre-import
|
||||
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
|
||||
// be downloaded yet or (2) the block is already imported into the fork-choice.
|
||||
// In case (1) the lookup must either successfully download the block or get dropped.
|
||||
// In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported`
|
||||
// and get dropped as completed.
|
||||
return Ok(LookupRequestResult::Pending("waiting for block download"));
|
||||
};
|
||||
let expected_blobs = block.num_expected_blobs();
|
||||
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
|
||||
|
||||
// Check if we are in deneb, before peerdas and inside da window
|
||||
if !self.chain.should_fetch_blobs(block_epoch) {
|
||||
return Ok(LookupRequestResult::NoRequestNeeded("blobs not required"));
|
||||
}
|
||||
|
||||
// No data required for this block
|
||||
if expected_blobs == 0 {
|
||||
return Ok(LookupRequestResult::NoRequestNeeded("no data"));
|
||||
}
|
||||
|
||||
let imported_blob_indexes = self
|
||||
.chain
|
||||
.data_availability_checker
|
||||
@@ -760,35 +723,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
&mut self,
|
||||
lookup_id: SingleLookupId,
|
||||
block_root: Hash256,
|
||||
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||
let Some(block) =
|
||||
downloaded_block.or_else(|| match self.chain.get_block_process_status(&block_root) {
|
||||
BlockProcessStatus::Unknown => None,
|
||||
BlockProcessStatus::NotValidated(block)
|
||||
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
|
||||
})
|
||||
else {
|
||||
// Wait to download the block before downloading columns. Then we can be sure that the
|
||||
// block has data, so there's no need to do "blind" requests for all possible columns and
|
||||
// latter handle the case where if the peer sent no columns, penalize.
|
||||
// - if `downloaded_block_expected_blobs` is Some = block is downloading or processing.
|
||||
// - if `num_expected_blobs` returns Some = block is processed.
|
||||
return Ok(LookupRequestResult::Pending("waiting for block download"));
|
||||
};
|
||||
let expected_blobs = block.num_expected_blobs();
|
||||
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
|
||||
|
||||
// Check if we are into peerdas and inside da window
|
||||
if !self.chain.should_fetch_custody_columns(block_epoch) {
|
||||
return Ok(LookupRequestResult::NoRequestNeeded("columns not required"));
|
||||
}
|
||||
|
||||
// No data required for this block
|
||||
if expected_blobs == 0 {
|
||||
return Ok(LookupRequestResult::NoRequestNeeded("no data"));
|
||||
}
|
||||
|
||||
let custody_indexes_imported = self
|
||||
.chain
|
||||
.data_availability_checker
|
||||
|
||||
Reference in New Issue
Block a user