mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-10 01:26:44 +00:00
Remove RequestState trait from lookup sync (#9391)
Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
@@ -1,164 +0,0 @@
|
||||
use crate::sync::block_lookups::single_block_lookup::{
|
||||
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
|
||||
};
|
||||
use crate::sync::block_lookups::{BlockRequestState, CustodyRequestState, PeerId};
|
||||
use crate::sync::manager::BlockProcessType;
|
||||
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use lighthouse_network::service::api_types::Id;
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use types::{DataColumnSidecarList, SignedBeaconBlock};
|
||||
|
||||
use super::SingleLookupId;
|
||||
use super::single_block_lookup::{ComponentRequests, DownloadResult};
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum ResponseType {
|
||||
Block,
|
||||
CustodyColumn,
|
||||
}
|
||||
|
||||
/// This trait unifies common single block lookup functionality across blocks and data columns.
|
||||
/// This includes making requests, verifying responses, and handling processing results. A
|
||||
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `CustodyRequestState`, this trait
|
||||
/// is implemented for each.
|
||||
///
|
||||
/// The use of the `ResponseType` associated type gives us a degree of type
|
||||
/// safety when handling a block/column response ensuring we only mutate the correct corresponding
|
||||
/// state.
|
||||
pub trait RequestState<T: BeaconChainTypes> {
|
||||
/// The type created after validation.
|
||||
type VerifiedResponseType: Clone;
|
||||
|
||||
/// Request the network context to prepare a request of a component of `block_root`. If the
|
||||
/// request is not necessary because the component is already known / processed, return false.
|
||||
/// Return true if it sent a request and we can expect an event back from the network.
|
||||
fn make_request(
|
||||
&self,
|
||||
id: Id,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
expected_blobs: usize,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupRequestResult, LookupRequestError>;
|
||||
|
||||
/* Response handling methods */
|
||||
|
||||
/// Send the response to the beacon processor.
|
||||
fn send_for_processing(
|
||||
id: Id,
|
||||
result: DownloadResult<Self::VerifiedResponseType>,
|
||||
cx: &SyncNetworkContext<T>,
|
||||
) -> Result<(), LookupRequestError>;
|
||||
|
||||
/* Utility methods */
|
||||
|
||||
/// Returns the `ResponseType` associated with this trait implementation. Useful in logging.
|
||||
fn response_type() -> ResponseType;
|
||||
|
||||
/// A getter for the `BlockRequestState` or `CustodyRequestState` associated with this trait.
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str>;
|
||||
|
||||
/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
|
||||
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType>;
|
||||
|
||||
/// A getter for a mutable reference to the SingleLookupRequestState associated with this trait.
|
||||
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType>;
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
|
||||
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
|
||||
|
||||
fn make_request(
|
||||
&self,
|
||||
id: SingleLookupId,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
_: usize,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupRequestResult, LookupRequestError> {
|
||||
cx.block_lookup_request(id, lookup_peers, self.requested_block_root)
|
||||
.map_err(LookupRequestError::SendFailedNetwork)
|
||||
}
|
||||
|
||||
fn send_for_processing(
|
||||
id: SingleLookupId,
|
||||
download_result: DownloadResult<Self::VerifiedResponseType>,
|
||||
cx: &SyncNetworkContext<T>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let DownloadResult {
|
||||
value,
|
||||
block_root,
|
||||
seen_timestamp,
|
||||
..
|
||||
} = download_result;
|
||||
cx.send_block_for_processing(id, block_root, value, seen_timestamp)
|
||||
.map_err(LookupRequestError::SendFailedProcessor)
|
||||
}
|
||||
|
||||
fn response_type() -> ResponseType {
|
||||
ResponseType::Block
|
||||
}
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||
Ok(&mut request.block_request_state)
|
||||
}
|
||||
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&self.state
|
||||
}
|
||||
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&mut self.state
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
||||
type VerifiedResponseType = DataColumnSidecarList<T::EthSpec>;
|
||||
|
||||
fn make_request(
|
||||
&self,
|
||||
id: Id,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
_: usize,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupRequestResult, LookupRequestError> {
|
||||
cx.custody_lookup_request(id, self.block_root, self.slot, lookup_peers)
|
||||
.map_err(LookupRequestError::SendFailedNetwork)
|
||||
}
|
||||
|
||||
fn send_for_processing(
|
||||
id: Id,
|
||||
download_result: DownloadResult<Self::VerifiedResponseType>,
|
||||
cx: &SyncNetworkContext<T>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let DownloadResult {
|
||||
value,
|
||||
block_root,
|
||||
seen_timestamp,
|
||||
..
|
||||
} = download_result;
|
||||
cx.send_custody_columns_for_processing(
|
||||
id,
|
||||
block_root,
|
||||
value,
|
||||
seen_timestamp,
|
||||
BlockProcessType::SingleCustodyColumn(id),
|
||||
)
|
||||
.map_err(LookupRequestError::SendFailedProcessor)
|
||||
}
|
||||
|
||||
fn response_type() -> ResponseType {
|
||||
ResponseType::CustodyColumn
|
||||
}
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||
match &mut request.component_requests {
|
||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
|
||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||
}
|
||||
}
|
||||
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&self.state
|
||||
}
|
||||
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&mut self.state
|
||||
}
|
||||
}
|
||||
@@ -24,27 +24,23 @@ use self::parent_chain::{NodeChain, compute_parent_chains};
|
||||
pub use self::single_block_lookup::DownloadResult;
|
||||
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
|
||||
use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE};
|
||||
use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
|
||||
use super::network_context::{RpcResponseError, SyncNetworkContext};
|
||||
use crate::metrics;
|
||||
use crate::network_beacon_processor::BlockProcessingResult;
|
||||
use crate::sync::SyncMessage;
|
||||
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use beacon_chain::block_verification_types::AsBlock;
|
||||
pub use common::RequestState;
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::PeerId;
|
||||
use lighthouse_network::service::api_types::SingleLookupReqId;
|
||||
use lru_cache::LRUTimeCache;
|
||||
pub use single_block_lookup::{BlockRequestState, CustodyRequestState};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use store::Hash256;
|
||||
use tracing::{debug, error, warn};
|
||||
use types::{EthSpec, SignedBeaconBlock};
|
||||
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock};
|
||||
|
||||
pub mod common;
|
||||
pub mod parent_chain;
|
||||
mod single_block_lookup;
|
||||
|
||||
@@ -74,35 +70,17 @@ const LOOKUP_MAX_DURATION_NO_PEERS_SECS: u64 = 10;
|
||||
/// take at most 2 GB. 200 lookups allow 3 parallel chains of depth 64 (current maximum).
|
||||
const MAX_LOOKUPS: usize = 200;
|
||||
|
||||
/// The value for `Sidecar` is the parent root of the sidecar.
|
||||
type BlockDownloadResponse<E> = Result<DownloadResult<Arc<SignedBeaconBlock<E>>>, RpcResponseError>;
|
||||
type CustodyDownloadResponse<E> =
|
||||
Result<DownloadResult<DataColumnSidecarList<E>>, RpcResponseError>;
|
||||
|
||||
pub enum BlockComponent<E: EthSpec> {
|
||||
Block(DownloadResult<Arc<SignedBeaconBlock<E>>>),
|
||||
Sidecar { parent_root: Hash256 },
|
||||
}
|
||||
|
||||
impl<E: EthSpec> BlockComponent<E> {
|
||||
fn parent_root(&self) -> Hash256 {
|
||||
match self {
|
||||
BlockComponent::Block(block) => block.value.parent_root(),
|
||||
BlockComponent::Sidecar { parent_root } => *parent_root,
|
||||
}
|
||||
}
|
||||
fn get_type(&self) -> &'static str {
|
||||
match self {
|
||||
BlockComponent::Block(_) => "block",
|
||||
BlockComponent::Sidecar { .. } => "sidecar",
|
||||
}
|
||||
}
|
||||
Sidecar,
|
||||
}
|
||||
|
||||
pub type SingleLookupId = u32;
|
||||
|
||||
enum Action {
|
||||
Retry,
|
||||
ParentUnknown { parent_root: Hash256 },
|
||||
Continue,
|
||||
}
|
||||
|
||||
pub struct BlockLookups<T: BeaconChainTypes> {
|
||||
/// A cache of block roots that must be ignored for some time to prevent useless searches. For
|
||||
/// example if a chain is too long, its lookup chain is dropped, and range sync is expected to
|
||||
@@ -190,11 +168,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
block_component: BlockComponent<T::EthSpec>,
|
||||
parent_root: Hash256,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> bool {
|
||||
let parent_root = block_component.parent_root();
|
||||
|
||||
let parent_lookup_exists =
|
||||
self.search_parent_of_child(parent_root, block_root, &[peer_id], cx);
|
||||
// Only create the child lookup if the parent exists
|
||||
@@ -215,7 +192,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Seach a block whose parent root is unknown.
|
||||
/// Search a block whose parent root is unknown.
|
||||
///
|
||||
/// Returns true if the lookup is created or already exists
|
||||
#[must_use = "only reference the new lookup if returns true"]
|
||||
@@ -358,13 +335,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
.find(|(_id, lookup)| lookup.is_for_block(block_root))
|
||||
{
|
||||
if let Some(block_component) = block_component {
|
||||
let component_type = block_component.get_type();
|
||||
let imported = lookup.add_child_components(block_component);
|
||||
if !imported {
|
||||
debug!(
|
||||
?block_root,
|
||||
component_type, "Lookup child component ignored"
|
||||
);
|
||||
debug!(?block_root, "Lookup child component ignored");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -436,88 +409,33 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
|
||||
/* Lookup responses */
|
||||
|
||||
/// Process a block or blob response received from a single lookup request.
|
||||
pub fn on_download_response<R: RequestState<T>>(
|
||||
/// Process a block response received from a single lookup request.
|
||||
pub fn on_block_download_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>,
|
||||
response: BlockDownloadResponse<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
let result = self.on_download_response_inner::<R>(id, response, cx);
|
||||
self.on_lookup_result(id.lookup_id, result, "download_response", cx);
|
||||
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
|
||||
debug!(?id, "Block returned for single block lookup not present");
|
||||
return;
|
||||
};
|
||||
let result = lookup.on_block_download_response(id.req_id, response, cx);
|
||||
self.on_lookup_result(id.lookup_id, result, "block_download_response", cx);
|
||||
}
|
||||
|
||||
/// Process a block or blob response received from a single lookup request.
|
||||
pub fn on_download_response_inner<R: RequestState<T>>(
|
||||
pub fn on_custody_download_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>,
|
||||
response: CustodyDownloadResponse<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
// Note: no need to downscore peers here, already downscored on network context
|
||||
|
||||
let response_type = R::response_type();
|
||||
) {
|
||||
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
|
||||
// We don't have the ability to cancel in-flight RPC requests. So this can happen
|
||||
// if we started this RPC request, and later saw the block/blobs via gossip.
|
||||
debug!(?id, "Block returned for single block lookup not present");
|
||||
return Err(LookupRequestError::UnknownLookup);
|
||||
debug!(?id, "Custody returned for single block lookup not present");
|
||||
return;
|
||||
};
|
||||
|
||||
let block_root = lookup.block_root();
|
||||
let request_state = R::request_state_mut(lookup)
|
||||
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
|
||||
.get_state_mut();
|
||||
|
||||
match response {
|
||||
Ok((response, peer_group, seen_timestamp)) => {
|
||||
debug!(
|
||||
?block_root,
|
||||
?id,
|
||||
?peer_group,
|
||||
?response_type,
|
||||
"Received lookup download success"
|
||||
);
|
||||
|
||||
// Here we could check if response extends a parent chain beyond its max length.
|
||||
// However we defer that check to the handling of a processing error ParentUnknown.
|
||||
//
|
||||
// Here we could check if there's already a lookup for parent_root of `response`. In
|
||||
// that case we know that sending the response for processing will likely result in
|
||||
// a `ParentUnknown` error. However, for simplicity we choose to not implement this
|
||||
// optimization.
|
||||
|
||||
// Register the download peer here. Once we have received some data over the wire we
|
||||
// attribute it to this peer for scoring latter regardless of how the request was
|
||||
// done.
|
||||
request_state.on_download_success(
|
||||
id.req_id,
|
||||
DownloadResult {
|
||||
value: response,
|
||||
block_root,
|
||||
seen_timestamp,
|
||||
peer_group,
|
||||
},
|
||||
)?;
|
||||
// continue_request will send for processing as the request state is AwaitingProcessing
|
||||
}
|
||||
Err(e) => {
|
||||
// No need to log peer source here. When sending a DataColumnsByRoot request we log
|
||||
// the peer and the request ID which is linked to this `id` value here.
|
||||
debug!(
|
||||
?block_root,
|
||||
?id,
|
||||
?response_type,
|
||||
error = ?e,
|
||||
"Received lookup download failure"
|
||||
);
|
||||
|
||||
request_state.on_download_failure(id.req_id)?;
|
||||
// continue_request will retry a download as the request state is AwaitingDownload
|
||||
}
|
||||
}
|
||||
|
||||
lookup.continue_requests(cx)
|
||||
let result = lookup.on_custody_download_response(id.req_id, response, cx);
|
||||
self.on_lookup_result(id.lookup_id, result, "custody_download_response", cx);
|
||||
}
|
||||
|
||||
/* Error responses */
|
||||
@@ -539,128 +457,29 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
result: BlockProcessingResult,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
let lookup_result = match process_type {
|
||||
BlockProcessType::SingleBlock { id } => {
|
||||
self.on_processing_result_inner::<BlockRequestState<T::EthSpec>>(id, result, cx)
|
||||
}
|
||||
BlockProcessType::SingleCustodyColumn(id) => {
|
||||
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
|
||||
}
|
||||
// TODO(gloas): route into the payload envelope lookup state machine.
|
||||
BlockProcessType::SinglePayloadEnvelope(_) => Ok(LookupResult::Pending),
|
||||
};
|
||||
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
|
||||
}
|
||||
|
||||
pub fn on_processing_result_inner<R: RequestState<T>>(
|
||||
&mut self,
|
||||
lookup_id: SingleLookupId,
|
||||
result: BlockProcessingResult,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
let lookup_id = process_type.id();
|
||||
let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else {
|
||||
debug!(id = lookup_id, "Unknown single block lookup");
|
||||
return Err(LookupRequestError::UnknownLookup);
|
||||
return;
|
||||
};
|
||||
|
||||
let block_root = lookup.block_root();
|
||||
let request_state = R::request_state_mut(lookup)
|
||||
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
|
||||
.get_state_mut();
|
||||
|
||||
debug!(
|
||||
component = ?R::response_type(),
|
||||
?block_root,
|
||||
block_root = ?lookup.block_root(),
|
||||
id = lookup_id,
|
||||
?process_type,
|
||||
?result,
|
||||
"Received lookup processing result"
|
||||
);
|
||||
|
||||
let action = match result {
|
||||
BlockProcessingResult::Imported(fully_imported, _info) => {
|
||||
// `on_processing_success` is called here to ensure the request state is updated
|
||||
// prior to checking if all components have been processed (relevant for
|
||||
// MissingComponents).
|
||||
request_state.on_processing_success()?;
|
||||
|
||||
if fully_imported {
|
||||
Action::Continue
|
||||
} else if lookup.all_components_processed() {
|
||||
// We don't request for other block components until being sure that the block has
|
||||
// data. If we request blobs / columns to a peer we are sure those must exist.
|
||||
// Therefore if all components are processed and we still receive `MissingComponents`
|
||||
// it indicates an internal bug.
|
||||
return Err(LookupRequestError::Failed(
|
||||
"missing components after all processed".to_owned(),
|
||||
));
|
||||
} else {
|
||||
Action::Retry
|
||||
}
|
||||
}
|
||||
BlockProcessingResult::ParentUnknown { parent_root } => {
|
||||
// `BlockError::ParentUnknown` is only returned when processing blocks. Reverts
|
||||
// the status of this request to `AwaitingProcessing` holding the downloaded
|
||||
// data. A future call to `continue_requests` will re-submit it once there are
|
||||
// no pending parent requests.
|
||||
request_state.revert_to_awaiting_processing()?;
|
||||
Action::ParentUnknown { parent_root }
|
||||
}
|
||||
BlockProcessingResult::Error { penalty, reason } => {
|
||||
// Retry on every processing error: `on_processing_failure` increments the
|
||||
// per-component failure counter, so `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS` bounds the
|
||||
// retry loop and eventually drops the lookup if the failure persists. Whether the
|
||||
// peer should be downscored is the producer's call (encoded in `penalty`).
|
||||
debug!(
|
||||
?block_root,
|
||||
component = ?R::response_type(),
|
||||
reason,
|
||||
?penalty,
|
||||
"Lookup component processing failed; retrying"
|
||||
);
|
||||
let peer_group = request_state.on_processing_failure()?;
|
||||
if let Some((action_kind, whom, msg)) = penalty {
|
||||
whom.apply(action_kind, &peer_group, msg, cx);
|
||||
}
|
||||
Action::Retry
|
||||
let lookup_result = match process_type {
|
||||
BlockProcessType::SingleBlock { .. } => lookup.on_block_processing_result(result, cx),
|
||||
BlockProcessType::SingleCustodyColumn(_) => {
|
||||
lookup.on_data_processing_result(result, cx)
|
||||
}
|
||||
// TODO(gloas): route into the payload envelope lookup state machine.
|
||||
BlockProcessType::SinglePayloadEnvelope(_) => Ok(LookupResult::Pending),
|
||||
};
|
||||
|
||||
match action {
|
||||
Action::Retry => {
|
||||
// Trigger download for all components in case `MissingComponents` failed the blob
|
||||
// request. Also if blobs are `AwaitingProcessing` and need to be progressed
|
||||
lookup.continue_requests(cx)
|
||||
}
|
||||
Action::ParentUnknown { parent_root } => {
|
||||
let peers = lookup.all_peers();
|
||||
// Mark lookup as awaiting **before** creating the parent lookup. At this point the
|
||||
// lookup maybe inconsistent.
|
||||
lookup.set_awaiting_parent(parent_root);
|
||||
let parent_lookup_exists =
|
||||
self.search_parent_of_child(parent_root, block_root, &peers, cx);
|
||||
if parent_lookup_exists {
|
||||
// The parent lookup exist or has been created. It's safe for `lookup` to
|
||||
// reference the parent as awaiting.
|
||||
debug!(
|
||||
id = lookup_id,
|
||||
?block_root,
|
||||
?parent_root,
|
||||
"Marking lookup as awaiting parent"
|
||||
);
|
||||
Ok(LookupResult::Pending)
|
||||
} else {
|
||||
// The parent lookup is faulty and was not created, we must drop the `lookup` as
|
||||
// it's in an inconsistent state. We must drop all of its children too.
|
||||
Err(LookupRequestError::Failed(format!(
|
||||
"Parent lookup is faulty {parent_root:?}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
Action::Continue => {
|
||||
// Drop this completed lookup only
|
||||
Ok(LookupResult::Completed)
|
||||
}
|
||||
}
|
||||
self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx);
|
||||
}
|
||||
|
||||
pub fn on_external_processing_result(
|
||||
@@ -757,7 +576,20 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> bool {
|
||||
match result {
|
||||
Ok(LookupResult::Pending) => true, // no action
|
||||
Ok(LookupResult::Pending) => true,
|
||||
Ok(LookupResult::ParentUnknown {
|
||||
parent_root,
|
||||
block_root,
|
||||
peers,
|
||||
}) => {
|
||||
if self.search_parent_of_child(parent_root, block_root, &peers, cx) {
|
||||
true
|
||||
} else {
|
||||
self.drop_lookup_and_children(id, "Failed");
|
||||
self.update_metrics();
|
||||
false
|
||||
}
|
||||
}
|
||||
Ok(LookupResult::Completed) => {
|
||||
if let Some(lookup) = self.single_block_lookups.remove(&id) {
|
||||
debug!(
|
||||
@@ -923,6 +755,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
|
||||
/// Adds peers to a lookup and its ancestors recursively.
|
||||
///
|
||||
/// Note: Takes a `lookup_id` as argument to allow recursion on mutable lookups, without having
|
||||
/// to duplicate the code to add peers to a lookup
|
||||
fn add_peers_to_lookup_and_ancestors(
|
||||
@@ -949,12 +782,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
|
||||
if let Some(parent_root) = lookup.awaiting_parent() {
|
||||
if let Some((&child_id, _)) = self
|
||||
if let Some((&parent_id, _)) = self
|
||||
.single_block_lookups
|
||||
.iter()
|
||||
.find(|(_, l)| l.block_root() == parent_root)
|
||||
{
|
||||
self.add_peers_to_lookup_and_ancestors(child_id, peers, cx)
|
||||
self.add_peers_to_lookup_and_ancestors(parent_id, peers, cx)
|
||||
} else {
|
||||
Err(format!("Lookup references unknown parent {parent_root:?}"))
|
||||
}
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
|
||||
use crate::sync::block_lookups::common::RequestState;
|
||||
use crate::network_beacon_processor::BlockProcessingResult;
|
||||
use crate::sync::block_lookups::{BlockDownloadResponse, CustodyDownloadResponse};
|
||||
use crate::sync::manager::BlockProcessType;
|
||||
use crate::sync::network_context::{
|
||||
LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, SendErrorProcessor,
|
||||
SyncNetworkContext,
|
||||
LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, RpcResponseError,
|
||||
SendErrorProcessor, SyncNetworkContext,
|
||||
};
|
||||
use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use beacon_chain::block_verification_types::AsBlock;
|
||||
use educe::Educe;
|
||||
use lighthouse_network::service::api_types::Id;
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use store::Hash256;
|
||||
@@ -24,15 +26,18 @@ pub enum LookupResult {
|
||||
Completed,
|
||||
/// Lookup is expecting some future event from the network
|
||||
Pending,
|
||||
/// Block's parent is not known to fork-choice, a parent lookup is needed
|
||||
ParentUnknown {
|
||||
parent_root: Hash256,
|
||||
block_root: Hash256,
|
||||
peers: Vec<PeerId>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
|
||||
pub enum LookupRequestError {
|
||||
/// Too many failed attempts
|
||||
TooManyAttempts {
|
||||
/// The failed attempts were primarily due to processing failures.
|
||||
cannot_process: bool,
|
||||
},
|
||||
TooManyAttempts,
|
||||
/// Error sending event to network
|
||||
SendFailedNetwork(RpcRequestSendError),
|
||||
/// Error sending event to processor
|
||||
@@ -52,33 +57,63 @@ pub enum LookupRequestError {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct BlockRequest<E: EthSpec> {
|
||||
state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> BlockRequest<E> {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
state: SingleLookupRequestState::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_complete(&self) -> bool {
|
||||
self.state.is_processed()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum DataRequest<E: EthSpec> {
|
||||
WaitingForBlock,
|
||||
Request {
|
||||
slot: Slot,
|
||||
state: SingleLookupRequestState<DataColumnSidecarList<E>>,
|
||||
},
|
||||
NoData,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> DataRequest<E> {
|
||||
fn is_complete(&self) -> bool {
|
||||
match &self {
|
||||
DataRequest::WaitingForBlock => false,
|
||||
DataRequest::Request { state, .. } => state.is_processed(),
|
||||
DataRequest::NoData => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type PeerSet = Arc<RwLock<HashSet<PeerId>>>;
|
||||
|
||||
#[derive(Educe)]
|
||||
#[educe(Debug(bound(T: BeaconChainTypes)))]
|
||||
pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||
pub id: Id,
|
||||
pub block_request_state: BlockRequestState<T::EthSpec>,
|
||||
pub component_requests: ComponentRequests<T::EthSpec>,
|
||||
block_root: Hash256,
|
||||
block_request: BlockRequest<T::EthSpec>,
|
||||
data_request: DataRequest<T::EthSpec>,
|
||||
/// Peers that claim to have imported this set of block components. This state is shared with
|
||||
/// the custody request to have an updated view of the peers that claim to have imported the
|
||||
/// block associated with this lookup. The peer set of a lookup can change rapidly, and faster
|
||||
/// than the lifetime of a custody request.
|
||||
#[educe(Debug(method(fmt_peer_set_as_len)))]
|
||||
peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
block_root: Hash256,
|
||||
peers: PeerSet,
|
||||
awaiting_parent: Option<Hash256>,
|
||||
created: Instant,
|
||||
pub(crate) span: Span,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ComponentRequests<E: EthSpec> {
|
||||
WaitingForBlock,
|
||||
ActiveCustodyRequest(CustodyRequestState<E>),
|
||||
// When printing in debug this state display the reason why it's not needed
|
||||
#[allow(dead_code)]
|
||||
NotNeeded(&'static str),
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
pub fn new(
|
||||
requested_block_root: Hash256,
|
||||
@@ -94,25 +129,25 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
|
||||
Self {
|
||||
id,
|
||||
block_request_state: BlockRequestState::new(requested_block_root),
|
||||
component_requests: ComponentRequests::WaitingForBlock,
|
||||
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
|
||||
block_root: requested_block_root,
|
||||
block_request: BlockRequest::new(),
|
||||
data_request: DataRequest::WaitingForBlock,
|
||||
peers: Arc::new(RwLock::new(peers.iter().copied().collect())),
|
||||
awaiting_parent,
|
||||
created: Instant::now(),
|
||||
span: lookup_span,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reset the status of all internal requests
|
||||
/// Reset the status of all requests (used on block processing failure)
|
||||
pub fn reset_requests(&mut self) {
|
||||
self.block_request_state = BlockRequestState::new(self.block_root);
|
||||
self.component_requests = ComponentRequests::WaitingForBlock;
|
||||
self.block_request = BlockRequest::new();
|
||||
self.data_request = DataRequest::WaitingForBlock;
|
||||
}
|
||||
|
||||
/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
|
||||
/// Return the slot of this lookup's block if it's currently cached
|
||||
pub fn peek_downloaded_block_slot(&self) -> Option<Slot> {
|
||||
self.block_request_state
|
||||
self.block_request
|
||||
.state
|
||||
.peek_downloaded_data()
|
||||
.map(|block| block.slot())
|
||||
@@ -147,15 +182,12 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
/// Maybe insert a verified response into this lookup. Returns true if imported
|
||||
pub fn add_child_components(&mut self, block_component: BlockComponent<T::EthSpec>) -> bool {
|
||||
match block_component {
|
||||
BlockComponent::Block(block) => self
|
||||
.block_request_state
|
||||
.state
|
||||
.insert_verified_response(block),
|
||||
BlockComponent::Sidecar { .. } => {
|
||||
// For now ignore single blobs and columns, as the blob request state assumes all blobs are
|
||||
// attributed to the same peer = the peer serving the remaining blobs. Ignoring this
|
||||
// block component has a minor effect, causing the node to re-request this blob
|
||||
// once the parent chain is successfully resolved
|
||||
BlockComponent::Block(block) => {
|
||||
self.block_request.state.insert_verified_response(block)
|
||||
}
|
||||
BlockComponent::Sidecar => {
|
||||
// There's nothing to do here, there's no component to insert. The lookup downloads
|
||||
// its required data columns itself once it has the block.
|
||||
false
|
||||
}
|
||||
}
|
||||
@@ -166,29 +198,14 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
self.block_root() == block_root
|
||||
}
|
||||
|
||||
/// Returns true if the block has already been downloaded.
|
||||
pub fn all_components_processed(&self) -> bool {
|
||||
self.block_request_state.state.is_processed()
|
||||
&& match &self.component_requests {
|
||||
ComponentRequests::WaitingForBlock => false,
|
||||
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
|
||||
ComponentRequests::NotNeeded { .. } => true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if this request is expecting some event to make progress
|
||||
pub fn is_awaiting_event(&self) -> bool {
|
||||
self.awaiting_parent.is_some()
|
||||
|| self.block_request_state.state.is_awaiting_event()
|
||||
|| match &self.component_requests {
|
||||
// If components are waiting for the block request to complete, here we should
|
||||
// check if the`block_request_state.state.is_awaiting_event(). However we already
|
||||
// checked that above, so `WaitingForBlock => false` is equivalent.
|
||||
ComponentRequests::WaitingForBlock => false,
|
||||
ComponentRequests::ActiveCustodyRequest(request) => {
|
||||
request.state.is_awaiting_event()
|
||||
}
|
||||
ComponentRequests::NotNeeded { .. } => false,
|
||||
|| self.block_request.state.is_awaiting_event()
|
||||
|| match &self.data_request {
|
||||
DataRequest::WaitingForBlock => true,
|
||||
DataRequest::Request { state, .. } => state.is_awaiting_event(),
|
||||
DataRequest::NoData => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,139 +216,167 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
let _guard = self.span.clone().entered();
|
||||
// TODO: Check what's necessary to download, specially for blobs
|
||||
self.continue_request::<BlockRequestState<T::EthSpec>>(cx, 0)?;
|
||||
|
||||
if let ComponentRequests::WaitingForBlock = self.component_requests {
|
||||
let downloaded_block = self
|
||||
.block_request_state
|
||||
.state
|
||||
.peek_downloaded_data()
|
||||
.cloned();
|
||||
|
||||
if let Some(block) = downloaded_block.or_else(|| {
|
||||
// If the block is already being processed or fully validated, retrieve how many blobs
|
||||
// it expects. Consider any stage of the block. If the block root has been validated, we
|
||||
// can assert that this is the correct value of `blob_kzg_commitments_count`.
|
||||
match cx.chain.get_block_process_status(&self.block_root) {
|
||||
BlockProcessStatus::Unknown => None,
|
||||
BlockProcessStatus::NotValidated(block, _)
|
||||
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
|
||||
}
|
||||
}) {
|
||||
let expected_blobs = block.num_expected_blobs();
|
||||
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
|
||||
if expected_blobs == 0 {
|
||||
self.component_requests = ComponentRequests::NotNeeded("no data");
|
||||
} else if cx.chain.should_fetch_custody_columns(block_epoch) {
|
||||
self.component_requests = ComponentRequests::ActiveCustodyRequest(
|
||||
CustodyRequestState::new(self.block_root, block.slot()),
|
||||
);
|
||||
} else {
|
||||
self.component_requests = ComponentRequests::NotNeeded("outside da window");
|
||||
}
|
||||
} else {
|
||||
// Wait to download the block before downloading blobs. Then we can be sure that the
|
||||
// block has data, so there's no need to do "blind" requests for all possible blobs and
|
||||
// latter handle the case where if the peer sent no blobs, penalize.
|
||||
//
|
||||
// Lookup sync event safety: Reaching this code means that a block is not in any pre-import
|
||||
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
|
||||
// be downloaded yet or (2) the block is already imported into the fork-choice.
|
||||
// In case (1) the lookup must either successfully download the block or get dropped.
|
||||
// In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported`
|
||||
// and get dropped as completed.
|
||||
}
|
||||
// === Block request ===
|
||||
self.block_request.state.maybe_start_downloading(|| {
|
||||
cx.block_lookup_request(self.id, self.peers.clone(), self.block_root)
|
||||
})?;
|
||||
if self.awaiting_parent.is_none()
|
||||
&& let Some(data) = self.block_request.state.maybe_start_processing()
|
||||
{
|
||||
cx.send_block_for_processing(self.id, self.block_root, data.value, data.seen_timestamp)
|
||||
.map_err(LookupRequestError::SendFailedProcessor)?;
|
||||
}
|
||||
|
||||
match &self.component_requests {
|
||||
ComponentRequests::WaitingForBlock => {} // do nothing
|
||||
ComponentRequests::ActiveCustodyRequest(_) => {
|
||||
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
|
||||
// === Data request ===
|
||||
loop {
|
||||
match &mut self.data_request {
|
||||
DataRequest::WaitingForBlock => {
|
||||
if let Some(block) = self.block_request.state.peek_downloaded_data() {
|
||||
let block_epoch = block
|
||||
.slot()
|
||||
.epoch(<T as BeaconChainTypes>::EthSpec::slots_per_epoch());
|
||||
self.data_request = if block.num_expected_blobs() == 0 {
|
||||
DataRequest::NoData
|
||||
} else if cx.chain.should_fetch_custody_columns(block_epoch) {
|
||||
DataRequest::Request {
|
||||
slot: block.slot(),
|
||||
state: SingleLookupRequestState::new(),
|
||||
}
|
||||
} else {
|
||||
DataRequest::NoData
|
||||
};
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
DataRequest::Request { slot, state } => {
|
||||
state.maybe_start_downloading(|| {
|
||||
cx.custody_lookup_request(
|
||||
self.id,
|
||||
self.block_root,
|
||||
*slot,
|
||||
self.peers.clone(),
|
||||
)
|
||||
})?;
|
||||
// Wait for the parent to be imported, data column processing result handle does
|
||||
// not support `ParentUnknown`.
|
||||
if self.awaiting_parent.is_none()
|
||||
&& let Some(data) = state.maybe_start_processing()
|
||||
{
|
||||
cx.send_custody_columns_for_processing(
|
||||
self.id,
|
||||
self.block_root,
|
||||
data.value,
|
||||
data.seen_timestamp,
|
||||
BlockProcessType::SingleCustodyColumn(self.id),
|
||||
)
|
||||
.map_err(LookupRequestError::SendFailedProcessor)?;
|
||||
}
|
||||
break;
|
||||
}
|
||||
DataRequest::NoData => break,
|
||||
}
|
||||
ComponentRequests::NotNeeded { .. } => {} // do nothing
|
||||
}
|
||||
|
||||
// If all components of this lookup are already processed, there will be no future events
|
||||
// that can make progress so it must be dropped. Consider the lookup completed.
|
||||
// This case can happen if we receive the components from gossip during a retry.
|
||||
if self.all_components_processed() {
|
||||
self.span = Span::none();
|
||||
Ok(LookupResult::Completed)
|
||||
} else {
|
||||
Ok(LookupResult::Pending)
|
||||
if self.block_request.is_complete() && self.data_request.is_complete() {
|
||||
return Ok(LookupResult::Completed);
|
||||
}
|
||||
|
||||
Ok(LookupResult::Pending)
|
||||
}
|
||||
|
||||
/// Potentially makes progress on this request if it's in a progress-able state
|
||||
fn continue_request<R: RequestState<T>>(
|
||||
/// Handle block processing result. Advances the lookup state machine.
|
||||
pub fn on_block_processing_result(
|
||||
&mut self,
|
||||
result: BlockProcessingResult,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
expected_blobs: usize,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let id = self.id;
|
||||
let awaiting_parent = self.awaiting_parent.is_some();
|
||||
let request =
|
||||
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
||||
|
||||
// Attempt to progress awaiting downloads
|
||||
if request.get_state().is_awaiting_download() {
|
||||
// Verify the current request has not exceeded the maximum number of attempts.
|
||||
let request_state = request.get_state();
|
||||
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
|
||||
let cannot_process = request_state.more_failed_processing_attempts();
|
||||
return Err(LookupRequestError::TooManyAttempts { cannot_process });
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
match result {
|
||||
BlockProcessingResult::Imported(_fully_imported, _info) => {
|
||||
self.block_request.state.on_processing_success()?;
|
||||
}
|
||||
|
||||
let peers = self.peers.clone();
|
||||
let request = R::request_state_mut(self)
|
||||
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
||||
|
||||
match request.make_request(id, peers, expected_blobs, cx)? {
|
||||
LookupRequestResult::RequestSent(req_id) => {
|
||||
// Lookup sync event safety: If make_request returns `RequestSent`, we are
|
||||
// guaranteed that `BlockLookups::on_download_response` will be called exactly
|
||||
// with this `req_id`.
|
||||
request.get_state_mut().on_download_start(req_id)?
|
||||
}
|
||||
LookupRequestResult::NoRequestNeeded(reason) => {
|
||||
// Lookup sync event safety: Advances this request to the terminal `Processed`
|
||||
// state. If all requests reach this state, the request is marked as completed
|
||||
// in `Self::continue_requests`.
|
||||
request.get_state_mut().on_completed_request(reason)?
|
||||
}
|
||||
// Sync will receive a future event to make progress on the request, do nothing now
|
||||
LookupRequestResult::Pending(reason) => {
|
||||
// Lookup sync event safety: Refer to the code paths constructing
|
||||
// `LookupRequestResult::Pending`
|
||||
request
|
||||
.get_state_mut()
|
||||
.update_awaiting_download_status(reason);
|
||||
return Ok(());
|
||||
BlockProcessingResult::ParentUnknown { parent_root } => {
|
||||
// `BlockError::ParentUnknown` is only returned when processing blocks. Revert the
|
||||
// block request to `Downloaded` and park this lookup until the parent resolves; a
|
||||
// future call to `continue_requests` will re-submit the block for processing once
|
||||
// the parent lookup completes.
|
||||
self.block_request.state.revert_to_awaiting_processing()?;
|
||||
self.set_awaiting_parent(parent_root);
|
||||
return Ok(LookupResult::ParentUnknown {
|
||||
parent_root,
|
||||
block_root: self.block_root,
|
||||
peers: self.all_peers(),
|
||||
});
|
||||
}
|
||||
BlockProcessingResult::Error { penalty, .. } => {
|
||||
let peers = self.block_request.state.on_processing_failure()?;
|
||||
if let Some((action, whom, msg)) = penalty {
|
||||
whom.apply(action, &peers, msg, cx);
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, attempt to progress awaiting processing
|
||||
// If this request is awaiting a parent lookup to be processed, do not send for processing.
|
||||
// The request will be rejected with unknown parent error.
|
||||
} else if !awaiting_parent {
|
||||
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
|
||||
// useful to conditionally access the result data.
|
||||
if let Some(result) = request.get_state_mut().maybe_start_processing() {
|
||||
// Lookup sync event safety: If `send_for_processing` returns Ok() we are guaranteed
|
||||
// that `BlockLookups::on_processing_result` will be called exactly once with this
|
||||
// lookup_id
|
||||
return R::send_for_processing(id, result, cx);
|
||||
}
|
||||
// Lookup sync event safety: If the request is not in `AwaitingDownload` or
|
||||
// `AwaitingProcessing` state it is guaranteed to receive some event to make progress.
|
||||
}
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
// Lookup sync event safety: If a lookup is awaiting a parent we are guaranteed to either:
|
||||
// (1) attempt to make progress with `BlockLookups::continue_child_lookups` if the parent
|
||||
// lookup completes, or (2) get dropped if the parent fails and is dropped.
|
||||
/// Handle data processing result
|
||||
pub fn on_data_processing_result(
|
||||
&mut self,
|
||||
result: BlockProcessingResult,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
let DataRequest::Request { state, .. } = &mut self.data_request else {
|
||||
return Err(LookupRequestError::BadState("no data_request".to_owned()));
|
||||
};
|
||||
|
||||
Ok(())
|
||||
match result {
|
||||
BlockProcessingResult::Imported(_fully_imported, _info) => {
|
||||
state.on_processing_success()?;
|
||||
}
|
||||
BlockProcessingResult::ParentUnknown { .. } => {
|
||||
return Err(LookupRequestError::BadState(
|
||||
"data processing returned ParentUnknown".to_owned(),
|
||||
));
|
||||
}
|
||||
BlockProcessingResult::Error { penalty, .. } => {
|
||||
let peers = state.on_processing_failure()?;
|
||||
if let Some((action, whom, msg)) = penalty {
|
||||
whom.apply(action, &peers, msg, cx);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
/// Handle a block download response. Updates download state and advances the lookup.
|
||||
pub fn on_block_download_response(
|
||||
&mut self,
|
||||
req_id: ReqId,
|
||||
result: BlockDownloadResponse<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
self.block_request
|
||||
.state
|
||||
.on_download_response(req_id, result)?;
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
/// Handle a custody columns download response. Updates download state and advances the lookup.
|
||||
pub fn on_custody_download_response(
|
||||
&mut self,
|
||||
req_id: ReqId,
|
||||
result: CustodyDownloadResponse<T::EthSpec>,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupResult, LookupRequestError> {
|
||||
let DataRequest::Request { state, .. } = &mut self.data_request else {
|
||||
return Err(LookupRequestError::BadState("no data_request".to_owned()));
|
||||
};
|
||||
|
||||
state.on_download_response(req_id, result)?;
|
||||
self.continue_requests(cx)
|
||||
}
|
||||
|
||||
/// Get all unique peers that claim to have imported this set of block components
|
||||
@@ -340,7 +385,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
}
|
||||
|
||||
/// Add peer to all request states. The peer must be able to serve this request.
|
||||
/// Returns true if the peer was newly inserted into some request state.
|
||||
/// Returns true if the peer was newly inserted into any peer set.
|
||||
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
|
||||
self.peers.write().insert(peer_id)
|
||||
}
|
||||
@@ -356,52 +401,23 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of the custody request component of a `SingleBlockLookup`.
|
||||
#[derive(Educe)]
|
||||
#[educe(Debug)]
|
||||
pub struct CustodyRequestState<E: EthSpec> {
|
||||
#[educe(Debug(ignore))]
|
||||
pub block_root: Hash256,
|
||||
pub slot: Slot,
|
||||
pub state: SingleLookupRequestState<DataColumnSidecarList<E>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> CustodyRequestState<E> {
|
||||
pub fn new(block_root: Hash256, slot: Slot) -> Self {
|
||||
Self {
|
||||
block_root,
|
||||
slot,
|
||||
state: SingleLookupRequestState::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of the block request component of a `SingleBlockLookup`.
|
||||
#[derive(Educe)]
|
||||
#[educe(Debug)]
|
||||
pub struct BlockRequestState<E: EthSpec> {
|
||||
#[educe(Debug(ignore))]
|
||||
pub requested_block_root: Hash256,
|
||||
pub state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> BlockRequestState<E> {
|
||||
pub fn new(block_root: Hash256) -> Self {
|
||||
Self {
|
||||
requested_block_root: block_root,
|
||||
state: SingleLookupRequestState::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DownloadResult<T: Clone> {
|
||||
pub value: T,
|
||||
pub block_root: Hash256,
|
||||
pub seen_timestamp: Duration,
|
||||
pub peer_group: PeerGroup,
|
||||
}
|
||||
|
||||
impl<T: Clone> DownloadResult<T> {
|
||||
pub fn new(value: T, peer_group: PeerGroup, seen_timestamp: Duration) -> Self {
|
||||
Self {
|
||||
value,
|
||||
seen_timestamp,
|
||||
peer_group,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(IntoStaticStr)]
|
||||
pub enum State<T: Clone> {
|
||||
AwaitingDownload(/* reason */ &'static str),
|
||||
@@ -410,7 +426,7 @@ pub enum State<T: Clone> {
|
||||
/// Request is processing, sent by lookup sync
|
||||
Processing(DownloadResult<T>),
|
||||
/// Request is processed
|
||||
Processed(/* reason */ &'static str),
|
||||
Processed(/* reason */ &'static str, T),
|
||||
}
|
||||
|
||||
/// Object representing the state of a single block or blob lookup request.
|
||||
@@ -477,10 +493,29 @@ impl<T: Clone> SingleLookupRequestState<T> {
|
||||
State::Downloading { .. } => None,
|
||||
State::AwaitingProcess(result) => Some(&result.value),
|
||||
State::Processing(result) => Some(&result.value),
|
||||
State::Processed { .. } => None,
|
||||
State::Processed(_, value) => Some(value),
|
||||
}
|
||||
}
|
||||
|
||||
/// Drive download: check max attempts, issue request, handle result.
|
||||
fn maybe_start_downloading(
|
||||
&mut self,
|
||||
request_fn: impl FnOnce() -> Result<LookupRequestResult<T>, RpcRequestSendError>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
if self.is_awaiting_download() {
|
||||
match request_fn().map_err(LookupRequestError::SendFailedNetwork)? {
|
||||
LookupRequestResult::RequestSent(req_id) => self.on_download_start(req_id)?,
|
||||
LookupRequestResult::NoRequestNeeded(reason, value) => {
|
||||
self.on_completed_request(reason, value)?
|
||||
}
|
||||
LookupRequestResult::Pending(reason) => {
|
||||
self.update_awaiting_download_status(reason)
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Switch to `AwaitingProcessing` if the request is in `AwaitingDownload` state, otherwise
|
||||
/// ignore.
|
||||
pub fn insert_verified_response(&mut self, result: DownloadResult<T>) -> bool {
|
||||
@@ -513,6 +548,17 @@ impl<T: Clone> SingleLookupRequestState<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_download_response(
|
||||
&mut self,
|
||||
req_id: ReqId,
|
||||
result: Result<DownloadResult<T>, RpcResponseError>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
match result {
|
||||
Ok(result) => self.on_download_success(req_id, result),
|
||||
Err(_) => self.on_download_failure(req_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers a failure in downloading a block. This might be a peer disconnection or a wrong
|
||||
/// block.
|
||||
pub fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> {
|
||||
@@ -525,6 +571,10 @@ impl<T: Clone> SingleLookupRequestState<T> {
|
||||
});
|
||||
}
|
||||
self.failed_downloading = self.failed_downloading.saturating_add(1);
|
||||
if self.failed_downloading >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
|
||||
return Err(LookupRequestError::TooManyAttempts);
|
||||
}
|
||||
|
||||
self.state = State::AwaitingDownload("not started");
|
||||
Ok(())
|
||||
}
|
||||
@@ -589,6 +639,9 @@ impl<T: Clone> SingleLookupRequestState<T> {
|
||||
State::Processing(result) => {
|
||||
let peers_source = result.peer_group.clone();
|
||||
self.failed_processing = self.failed_processing.saturating_add(1);
|
||||
if self.failed_processing >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
|
||||
return Err(LookupRequestError::TooManyAttempts);
|
||||
}
|
||||
self.state = State::AwaitingDownload("not started");
|
||||
Ok(peers_source)
|
||||
}
|
||||
@@ -600,8 +653,8 @@ impl<T: Clone> SingleLookupRequestState<T> {
|
||||
|
||||
pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> {
|
||||
match &self.state {
|
||||
State::Processing(_) => {
|
||||
self.state = State::Processed("processing success");
|
||||
State::Processing(data) => {
|
||||
self.state = State::Processed("processing success", data.value.clone());
|
||||
Ok(())
|
||||
}
|
||||
other => Err(LookupRequestError::BadState(format!(
|
||||
@@ -611,10 +664,14 @@ impl<T: Clone> SingleLookupRequestState<T> {
|
||||
}
|
||||
|
||||
/// Mark a request as complete without any download or processing
|
||||
pub fn on_completed_request(&mut self, reason: &'static str) -> Result<(), LookupRequestError> {
|
||||
pub fn on_completed_request(
|
||||
&mut self,
|
||||
reason: &'static str,
|
||||
value: T,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
match &self.state {
|
||||
State::AwaitingDownload { .. } => {
|
||||
self.state = State::Processed(reason);
|
||||
self.state = State::Processed(reason, value);
|
||||
Ok(())
|
||||
}
|
||||
other => Err(LookupRequestError::BadState(format!(
|
||||
@@ -622,15 +679,6 @@ impl<T: Clone> SingleLookupRequestState<T> {
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// The total number of failures, whether it be processing or downloading.
|
||||
pub fn failed_attempts(&self) -> u8 {
|
||||
self.failed_processing + self.failed_downloading
|
||||
}
|
||||
|
||||
pub fn more_failed_processing_attempts(&self) -> bool {
|
||||
self.failed_processing >= self.failed_downloading
|
||||
}
|
||||
}
|
||||
|
||||
// Display is used in the BadState assertions above
|
||||
@@ -647,15 +695,15 @@ impl<T: Clone> std::fmt::Debug for State<T> {
|
||||
match self {
|
||||
Self::AwaitingDownload(reason) => write!(f, "AwaitingDownload({})", reason),
|
||||
Self::Downloading(req_id) => write!(f, "Downloading({:?})", req_id),
|
||||
Self::AwaitingProcess(d) => write!(f, "AwaitingProcess({:?})", d.peer_group),
|
||||
Self::Processing(d) => write!(f, "Processing({:?})", d.peer_group),
|
||||
Self::Processed(reason) => write!(f, "Processed({})", reason),
|
||||
Self::AwaitingProcess(_) => write!(f, "AwaitingProcess"),
|
||||
Self::Processing(_) => write!(f, "Processing"),
|
||||
Self::Processed(reason, _) => write!(f, "Processed({})", reason),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn fmt_peer_set_as_len(
|
||||
peer_set: &Arc<RwLock<HashSet<PeerId>>>,
|
||||
peer_set: &PeerSet,
|
||||
f: &mut std::fmt::Formatter,
|
||||
) -> Result<(), std::fmt::Error> {
|
||||
write!(f, "{}", peer_set.read().len())
|
||||
|
||||
@@ -45,9 +45,7 @@ use crate::network_beacon_processor::{
|
||||
};
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::block_lookups::{
|
||||
BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
|
||||
};
|
||||
use crate::sync::block_lookups::{BlockComponent, DownloadResult};
|
||||
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
|
||||
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
|
||||
use beacon_chain::block_verification_types::AsBlock;
|
||||
@@ -867,7 +865,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
block_slot,
|
||||
BlockComponent::Block(DownloadResult {
|
||||
value: block.block_cloned(),
|
||||
block_root,
|
||||
seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(),
|
||||
peer_group: PeerGroup::from_single(peer_id),
|
||||
}),
|
||||
@@ -885,7 +882,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
block_root,
|
||||
parent_root,
|
||||
slot,
|
||||
BlockComponent::Sidecar { parent_root },
|
||||
BlockComponent::Sidecar,
|
||||
);
|
||||
}
|
||||
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => {
|
||||
@@ -975,6 +972,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
if self.block_lookups.search_child_and_parent(
|
||||
block_root,
|
||||
block_component,
|
||||
parent_root,
|
||||
peer_id,
|
||||
&mut self.network,
|
||||
) {
|
||||
@@ -1125,14 +1123,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
) {
|
||||
if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) {
|
||||
self.block_lookups
|
||||
.on_download_response::<BlockRequestState<T::EthSpec>>(
|
||||
id,
|
||||
resp.map(|(value, seen_timestamp)| {
|
||||
(value, PeerGroup::from_single(peer_id), seen_timestamp)
|
||||
}),
|
||||
&mut self.network,
|
||||
)
|
||||
self.block_lookups.on_block_download_response(
|
||||
id,
|
||||
resp.map(|(value, seen_timestamp)| {
|
||||
DownloadResult::new(value, PeerGroup::from_single(peer_id), seen_timestamp)
|
||||
}),
|
||||
&mut self.network,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1308,11 +1305,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
response: CustodyByRootResult<T::EthSpec>,
|
||||
) {
|
||||
self.block_lookups
|
||||
.on_download_response::<CustodyRequestState<T::EthSpec>>(
|
||||
requester.0,
|
||||
response,
|
||||
&mut self.network,
|
||||
);
|
||||
.on_custody_download_response(requester.0, response, &mut self.network);
|
||||
}
|
||||
|
||||
/// Handles receiving a response for a range sync request that should have both blocks and
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::network_beacon_processor::TestBeaconChainType;
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::batch::ByRangeRequestType;
|
||||
use crate::sync::block_lookups::SingleLookupId;
|
||||
use crate::sync::block_lookups::{DownloadResult, SingleLookupId};
|
||||
use crate::sync::block_sidecar_coupling::CouplingError;
|
||||
use crate::sync::range_data_column_batch_request::RangeDataColumnBatchRequest;
|
||||
use beacon_chain::block_verification_types::LookupBlock;
|
||||
@@ -95,7 +95,7 @@ pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
|
||||
|
||||
/// Duration = latest seen timestamp of all received data columns
|
||||
pub type CustodyByRootResult<T> =
|
||||
Result<(DataColumnSidecarList<T>, PeerGroup, Duration), RpcResponseError>;
|
||||
Result<DownloadResult<DataColumnSidecarList<T>>, RpcResponseError>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RpcResponseError {
|
||||
@@ -176,13 +176,13 @@ impl PeerGroup {
|
||||
/// Sequential ID that uniquely identifies ReqResp outgoing requests
|
||||
pub type ReqId = u32;
|
||||
|
||||
pub enum LookupRequestResult<I = ReqId> {
|
||||
pub enum LookupRequestResult<T, I = ReqId> {
|
||||
/// A request is sent. Sync MUST receive an event from the network in the future for either:
|
||||
/// completed response or failed request
|
||||
RequestSent(I),
|
||||
/// No request is sent, and no further action is necessary to consider this request completed.
|
||||
/// Includes a reason why this request is not needed.
|
||||
NoRequestNeeded(&'static str),
|
||||
NoRequestNeeded(&'static str, T),
|
||||
/// No request is sent, but the request is not completed. Sync MUST receive some future event
|
||||
/// that makes progress on the request. For example: request is processing from a different
|
||||
/// source (i.e. block received from gossip) and sync MUST receive an event with that processing
|
||||
@@ -820,7 +820,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
lookup_id: SingleLookupId,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
block_root: Hash256,
|
||||
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||
) -> Result<LookupRequestResult<Arc<SignedBeaconBlock<T::EthSpec>>>, RpcRequestSendError> {
|
||||
let active_request_count_by_peer = self.active_request_count_by_peer();
|
||||
let Some(peer_id) = lookup_peers
|
||||
.read()
|
||||
@@ -871,9 +871,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
},
|
||||
// Block is fully validated. If it's not yet imported it's waiting for missing block
|
||||
// components. Consider this request completed and do nothing.
|
||||
BlockProcessStatus::ExecutionValidated { .. } => {
|
||||
BlockProcessStatus::ExecutionValidated(block) => {
|
||||
return Ok(LookupRequestResult::NoRequestNeeded(
|
||||
"block execution validated",
|
||||
block,
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -937,12 +938,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
lookup_id: SingleLookupId,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
block_root: Hash256,
|
||||
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||
) -> Result<LookupRequestResult<()>, RpcRequestSendError> {
|
||||
// Skip the download if fork-choice already saw this envelope (e.g. imported via gossip
|
||||
// before the lookup got here).
|
||||
if self.chain.envelope_is_known_to_fork_choice(&block_root) {
|
||||
return Ok(LookupRequestResult::NoRequestNeeded(
|
||||
"envelope already known to fork-choice",
|
||||
(),
|
||||
));
|
||||
}
|
||||
|
||||
@@ -1011,7 +1013,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
peer_id: PeerId,
|
||||
request: DataColumnsByRootSingleBlockRequest,
|
||||
expect_max_responses: bool,
|
||||
) -> Result<LookupRequestResult<DataColumnsByRootRequestId>, &'static str> {
|
||||
) -> Result<LookupRequestResult<(), DataColumnsByRootRequestId>, &'static str> {
|
||||
let id = DataColumnsByRootRequestId {
|
||||
id: self.next_id(),
|
||||
requester,
|
||||
@@ -1060,7 +1062,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
block_root: Hash256,
|
||||
block_slot: Slot,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||
) -> Result<LookupRequestResult<DataColumnSidecarList<T::EthSpec>>, RpcRequestSendError> {
|
||||
let custody_indexes_imported = self
|
||||
.chain
|
||||
.cached_data_column_indexes(&block_root, block_slot)
|
||||
@@ -1078,7 +1080,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
|
||||
if custody_indexes_to_fetch.is_empty() {
|
||||
// No indexes required, do not issue any request
|
||||
return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch"));
|
||||
return Ok(LookupRequestResult::NoRequestNeeded(
|
||||
"no indices to fetch",
|
||||
vec![],
|
||||
));
|
||||
}
|
||||
|
||||
let id = SingleLookupReqId {
|
||||
@@ -1528,8 +1533,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
|
||||
// an Option first to use in an `if let Some() { act on result }` block.
|
||||
match result.as_ref() {
|
||||
Some(Ok((columns, peer_group, _))) => {
|
||||
debug!(?id, count = columns.len(), peers = ?peer_group, "Custody request success, removing")
|
||||
Some(Ok(data)) => {
|
||||
debug!(?id, count = data.value.len(), peers = ?data.peer_group, "Custody request success, removing")
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
debug!(?id, error = ?e, "Custody request failure, removing" )
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::sync::block_lookups::DownloadResult;
|
||||
use crate::sync::network_context::{
|
||||
DataColumnsByRootRequestId, DataColumnsByRootSingleBlockRequest,
|
||||
};
|
||||
@@ -56,8 +57,7 @@ struct ActiveBatchColumnsRequest {
|
||||
span: Span,
|
||||
}
|
||||
|
||||
pub type CustodyRequestResult<E> =
|
||||
Result<Option<(DataColumnSidecarList<E>, PeerGroup, Duration)>, Error>;
|
||||
pub type CustodyRequestResult<E> = Result<Option<DownloadResult<DataColumnSidecarList<E>>>, Error>;
|
||||
|
||||
impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
||||
pub(crate) fn new(
|
||||
@@ -227,7 +227,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
||||
.into_iter()
|
||||
.max()
|
||||
.unwrap_or_else(|| cx.chain.slot_clock.now_duration().unwrap_or_default());
|
||||
return Ok(Some((columns, peer_group, max_seen_timestamp)));
|
||||
return Ok(Some(DownloadResult::new(
|
||||
columns,
|
||||
peer_group,
|
||||
max_seen_timestamp,
|
||||
)));
|
||||
}
|
||||
|
||||
let active_request_count_by_peer = cx.active_request_count_by_peer();
|
||||
@@ -343,7 +347,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
|
||||
},
|
||||
);
|
||||
}
|
||||
LookupRequestResult::NoRequestNeeded(_) => unreachable!(),
|
||||
LookupRequestResult::NoRequestNeeded(..) => unreachable!(),
|
||||
LookupRequestResult::Pending(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user