mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 16:55:46 +00:00
clean up everything add child component, fix peer scoring and retry logic
This commit is contained in:
@@ -5,8 +5,10 @@ use crate::eth1_finalization_cache::Eth1FinalizationData;
|
|||||||
use crate::{data_availability_checker, GossipVerifiedBlock, PayloadVerificationOutcome};
|
use crate::{data_availability_checker, GossipVerifiedBlock, PayloadVerificationOutcome};
|
||||||
use derivative::Derivative;
|
use derivative::Derivative;
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
|
use ssz_types::VariableList;
|
||||||
use state_processing::ConsensusContext;
|
use state_processing::ConsensusContext;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use types::blob_sidecar::FixedBlobSidecarList;
|
||||||
use types::{
|
use types::{
|
||||||
blob_sidecar::BlobIdentifier, ssz_tagged_beacon_state, ssz_tagged_signed_beacon_block,
|
blob_sidecar::BlobIdentifier, ssz_tagged_beacon_state, ssz_tagged_signed_beacon_block,
|
||||||
ssz_tagged_signed_beacon_block_arc,
|
ssz_tagged_signed_beacon_block_arc,
|
||||||
@@ -72,6 +74,22 @@ impl<E: EthSpec> RpcBlock<E> {
|
|||||||
Ok(Self { block: inner })
|
Ok(Self { block: inner })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn new_from_fixed(
|
||||||
|
block: Arc<SignedBeaconBlock<E>>,
|
||||||
|
blobs: FixedBlobSidecarList<E>,
|
||||||
|
) -> Result<Self, AvailabilityCheckError> {
|
||||||
|
let filtered = blobs
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|b| b.clone())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let blobs = if filtered.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(VariableList::from(filtered))
|
||||||
|
};
|
||||||
|
Self::new(block, blobs)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn deconstruct(self) -> (Arc<SignedBeaconBlock<E>>, Option<BlobSidecarList<E>>) {
|
pub fn deconstruct(self) -> (Arc<SignedBeaconBlock<E>>, Option<BlobSidecarList<E>>) {
|
||||||
match self.block {
|
match self.block {
|
||||||
RpcBlockInner::Block(block) => (block, None),
|
RpcBlockInner::Block(block) => (block, None),
|
||||||
|
|||||||
@@ -434,6 +434,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
seen_timestamp: Duration,
|
seen_timestamp: Duration,
|
||||||
process_type: BlockProcessType,
|
process_type: BlockProcessType,
|
||||||
) -> Result<(), Error<T::EthSpec>> {
|
) -> Result<(), Error<T::EthSpec>> {
|
||||||
|
let blob_count = blobs.iter().filter(|b| b.is_some()).count();
|
||||||
|
if blob_count == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
let process_fn = self.clone().generate_rpc_blobs_process_fn(
|
let process_fn = self.clone().generate_rpc_blobs_process_fn(
|
||||||
block_root,
|
block_root,
|
||||||
blobs,
|
blobs,
|
||||||
|
|||||||
@@ -318,6 +318,7 @@ impl TestRig {
|
|||||||
}
|
}
|
||||||
pub fn enqueue_single_lookup_rpc_blobs(&self) {
|
pub fn enqueue_single_lookup_rpc_blobs(&self) {
|
||||||
if let Some(blobs) = self.next_blobs.clone() {
|
if let Some(blobs) = self.next_blobs.clone() {
|
||||||
|
dbg!(blobs.len());
|
||||||
let blobs = FixedBlobSidecarList::from(
|
let blobs = FixedBlobSidecarList::from(
|
||||||
blobs
|
blobs
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@@ -1003,6 +1004,7 @@ async fn test_rpc_block_reprocessing() {
|
|||||||
|
|
||||||
rig.enqueue_single_lookup_rpc_blobs();
|
rig.enqueue_single_lookup_rpc_blobs();
|
||||||
if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 {
|
if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 {
|
||||||
|
dbg!("here");
|
||||||
rig.assert_event_journal(&[RPC_BLOBS, WORKER_FREED, NOTHING_TO_DO])
|
rig.assert_event_journal(&[RPC_BLOBS, WORKER_FREED, NOTHING_TO_DO])
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|||||||
448
beacon_node/network/src/sync/block_lookups/common.rs
Normal file
448
beacon_node/network/src/sync/block_lookups/common.rs
Normal file
@@ -0,0 +1,448 @@
|
|||||||
|
use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE;
|
||||||
|
use crate::sync::block_lookups::single_block_lookup::{
|
||||||
|
LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State,
|
||||||
|
};
|
||||||
|
use crate::sync::block_lookups::{
|
||||||
|
BlobRequestState, BlockLookups, BlockRequestState, PeerShouldHave,
|
||||||
|
SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
|
||||||
|
};
|
||||||
|
use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId};
|
||||||
|
use crate::sync::network_context::SyncNetworkContext;
|
||||||
|
use crate::sync::ChildComponents;
|
||||||
|
use beacon_chain::block_verification_types::RpcBlock;
|
||||||
|
use beacon_chain::{get_block_root, BeaconChainTypes};
|
||||||
|
use lighthouse_network::rpc::methods::BlobsByRootRequest;
|
||||||
|
use lighthouse_network::rpc::BlocksByRootRequest;
|
||||||
|
use lighthouse_network::PeerId;
|
||||||
|
use rand::prelude::IteratorRandom;
|
||||||
|
use ssz_types::VariableList;
|
||||||
|
use std::ops::IndexMut;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use types::blob_sidecar::FixedBlobSidecarList;
|
||||||
|
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock};
|
||||||
|
|
||||||
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
pub enum ResponseType {
|
||||||
|
Block,
|
||||||
|
Blob,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
pub enum LookupType {
|
||||||
|
Current,
|
||||||
|
Parent,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This trait helps differentiate `SingleBlockLookup`s from `ParentLookup`s .This is useful in
|
||||||
|
/// ensuring requests and responses are handled separately and enables us to use different failure
|
||||||
|
/// tolerances for each, while re-using the same basic request and retry logic.
|
||||||
|
pub trait Lookup {
|
||||||
|
const MAX_ATTEMPTS: u8;
|
||||||
|
fn lookup_type() -> LookupType;
|
||||||
|
fn max_attempts() -> u8 {
|
||||||
|
Self::MAX_ATTEMPTS
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A `Lookup` that is a part of a `ParentLookup`.
|
||||||
|
pub struct Parent;
|
||||||
|
|
||||||
|
impl Lookup for Parent {
|
||||||
|
const MAX_ATTEMPTS: u8 = PARENT_FAIL_TOLERANCE;
|
||||||
|
fn lookup_type() -> LookupType {
|
||||||
|
LookupType::Parent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A `Lookup` that part of a single block lookup.
|
||||||
|
pub struct Current;
|
||||||
|
|
||||||
|
impl Lookup for Current {
|
||||||
|
const MAX_ATTEMPTS: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS;
|
||||||
|
fn lookup_type() -> LookupType {
|
||||||
|
LookupType::Current
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This trait unifies common single block lookup functionality across blocks and blobs. This
|
||||||
|
/// includes making requests, verifying responses, and handling processing results. A
|
||||||
|
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is
|
||||||
|
/// implemented for each.
|
||||||
|
///
|
||||||
|
/// The use of the `ResponseType` associated type gives us a degree of type
|
||||||
|
/// safety when handling a block/blob response ensuring we only mutate the correct corresponding
|
||||||
|
/// state.
|
||||||
|
pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
|
||||||
|
/// The type of the request .
|
||||||
|
type RequestType;
|
||||||
|
|
||||||
|
/// A block or blob response.
|
||||||
|
type ResponseType;
|
||||||
|
|
||||||
|
/// The type created after validation.
|
||||||
|
type VerifiedResponseType: Clone;
|
||||||
|
|
||||||
|
/// We convert a `VerifiedResponseType` to this type prior to sending it to the beacon processor.
|
||||||
|
type ReconstructedResponseType;
|
||||||
|
|
||||||
|
/* Request building methods */
|
||||||
|
|
||||||
|
/// Construct a new request.
|
||||||
|
fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
|
||||||
|
debug_assert!(matches!(self.get_state().state, State::AwaitingDownload));
|
||||||
|
self.too_many_attempts()?;
|
||||||
|
let peer = self.get_peer()?;
|
||||||
|
let request = self.new_request();
|
||||||
|
self.get_state_mut().req_counter += 1;
|
||||||
|
Ok((peer, request))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verify the current request has not exceeded the maximum number of attempts.
|
||||||
|
fn too_many_attempts(&self) -> Result<(), LookupRequestError> {
|
||||||
|
let max_attempts = L::max_attempts();
|
||||||
|
let request_state = self.get_state();
|
||||||
|
|
||||||
|
if request_state.failed_attempts() >= max_attempts {
|
||||||
|
let cannot_process =
|
||||||
|
request_state.failed_processing >= request_state.failed_downloading;
|
||||||
|
Err(LookupRequestError::TooManyAttempts { cannot_process })
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the next peer to request. Draws from the set of peers we think should have both the
|
||||||
|
/// block and blob first. If that fails, we draw from the set of peers that may have either.
|
||||||
|
fn get_peer(&mut self) -> Result<PeerId, LookupRequestError> {
|
||||||
|
let request_state = self.get_state_mut();
|
||||||
|
let available_peer_opt = request_state
|
||||||
|
.available_peers
|
||||||
|
.iter()
|
||||||
|
.choose(&mut rand::thread_rng())
|
||||||
|
.copied()
|
||||||
|
.map(PeerShouldHave::BlockAndBlobs);
|
||||||
|
|
||||||
|
let Some(peer_id) = available_peer_opt.or_else(||request_state
|
||||||
|
.potential_peers
|
||||||
|
.iter()
|
||||||
|
.choose(&mut rand::thread_rng())
|
||||||
|
.copied()
|
||||||
|
.map(PeerShouldHave::Neither)) else {
|
||||||
|
return Err(LookupRequestError::NoPeers);
|
||||||
|
};
|
||||||
|
request_state.used_peers.insert(peer_id.to_peer_id());
|
||||||
|
request_state.state = State::Downloading { peer_id };
|
||||||
|
Ok(peer_id.to_peer_id())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize `Self::RequestType`.
|
||||||
|
fn new_request(&self) -> Self::RequestType;
|
||||||
|
|
||||||
|
/// Send the request to the network service.
|
||||||
|
fn make_request(
|
||||||
|
id: SingleLookupReqId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
request: Self::RequestType,
|
||||||
|
cx: &SyncNetworkContext<T>,
|
||||||
|
) -> Result<(), LookupRequestError>;
|
||||||
|
|
||||||
|
/* Response handling methods */
|
||||||
|
|
||||||
|
/// Verify the response is valid based on what we requested.
|
||||||
|
fn verify_response(
|
||||||
|
&mut self,
|
||||||
|
expected_block_root: Hash256,
|
||||||
|
response: Option<Self::ResponseType>,
|
||||||
|
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError> {
|
||||||
|
let request_state = self.get_state_mut();
|
||||||
|
match request_state.state {
|
||||||
|
State::AwaitingDownload => {
|
||||||
|
request_state.register_failure_downloading();
|
||||||
|
Err(LookupVerifyError::ExtraBlocksReturned)
|
||||||
|
}
|
||||||
|
State::Downloading { peer_id } => {
|
||||||
|
self.verify_response_inner(expected_block_root, response, peer_id)
|
||||||
|
}
|
||||||
|
State::Processing { peer_id: _ } => match response {
|
||||||
|
Some(_) => {
|
||||||
|
// We sent the block for processing and received an extra block.
|
||||||
|
request_state.register_failure_downloading();
|
||||||
|
Err(LookupVerifyError::ExtraBlocksReturned)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// This is simply the stream termination and we are already processing the
|
||||||
|
// block
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The response verification unique to block or blobs.
|
||||||
|
fn verify_response_inner(
|
||||||
|
&mut self,
|
||||||
|
expected_block_root: Hash256,
|
||||||
|
response: Option<Self::ResponseType>,
|
||||||
|
peer_id: PeerShouldHave,
|
||||||
|
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError>;
|
||||||
|
|
||||||
|
/// A getter for the parent root of the response. Returns an `Option` because we won't know
|
||||||
|
/// the blob parent if we don't end up getting any blobs in the response.
|
||||||
|
fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option<Hash256>;
|
||||||
|
|
||||||
|
/// Caches the verified response in the lookup if necessary. This is only necessary for lookups
|
||||||
|
/// triggered by `UnknownParent` errors.
|
||||||
|
fn add_to_child_components(
|
||||||
|
verified_response: Self::VerifiedResponseType,
|
||||||
|
components: &mut ChildComponents<T::EthSpec>,
|
||||||
|
);
|
||||||
|
|
||||||
|
/// Convert a verified response to the type we send to the beacon processor.
|
||||||
|
fn verified_to_reconstructed(
|
||||||
|
verified: Self::VerifiedResponseType,
|
||||||
|
) -> Self::ReconstructedResponseType;
|
||||||
|
|
||||||
|
/// Send the response to the beacon processor.
|
||||||
|
fn send_for_processing(
|
||||||
|
id: Id,
|
||||||
|
bl: &BlockLookups<T>,
|
||||||
|
block_root: Hash256,
|
||||||
|
verified: Self::ReconstructedResponseType,
|
||||||
|
duration: Duration,
|
||||||
|
cx: &SyncNetworkContext<T>,
|
||||||
|
) -> Result<(), LookupRequestError>;
|
||||||
|
|
||||||
|
/// Remove the peer from the lookup if it is useless.
|
||||||
|
fn remove_if_useless(&mut self, peer: &PeerId) {
|
||||||
|
self.get_state_mut().remove_peer_if_useless(peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a failure to process the block or blob.
|
||||||
|
fn register_failure_downloading(&mut self) {
|
||||||
|
self.get_state_mut().register_failure_downloading()
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Utility methods */
|
||||||
|
|
||||||
|
/// Returns the `ResponseType` associated with this trait implementation. Useful in logging.
|
||||||
|
fn response_type() -> ResponseType;
|
||||||
|
|
||||||
|
/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
|
||||||
|
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self;
|
||||||
|
|
||||||
|
/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
|
||||||
|
fn get_state(&self) -> &SingleLookupRequestState;
|
||||||
|
|
||||||
|
/// A getter for a mutable reference to the SingleLookupRequestState associated with this trait.
|
||||||
|
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L> {
|
||||||
|
type RequestType = BlocksByRootRequest;
|
||||||
|
type ResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
|
||||||
|
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
|
||||||
|
type ReconstructedResponseType = RpcBlock<T::EthSpec>;
|
||||||
|
|
||||||
|
fn new_request(&self) -> BlocksByRootRequest {
|
||||||
|
BlocksByRootRequest::new(VariableList::from(vec![self.requested_block_root]))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_request(
|
||||||
|
id: SingleLookupReqId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
request: Self::RequestType,
|
||||||
|
cx: &SyncNetworkContext<T>,
|
||||||
|
) -> Result<(), LookupRequestError> {
|
||||||
|
cx.block_lookup_request(id, peer_id, request, L::lookup_type())
|
||||||
|
.map_err(LookupRequestError::SendFailed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verify_response_inner(
|
||||||
|
&mut self,
|
||||||
|
expected_block_root: Hash256,
|
||||||
|
response: Option<Self::ResponseType>,
|
||||||
|
peer_id: PeerShouldHave,
|
||||||
|
) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, LookupVerifyError> {
|
||||||
|
match response {
|
||||||
|
Some(block) => {
|
||||||
|
// Compute the block root using this specific function so that we can get timing
|
||||||
|
// metrics.
|
||||||
|
let block_root = get_block_root(&block);
|
||||||
|
if block_root != expected_block_root {
|
||||||
|
// return an error and drop the block
|
||||||
|
// NOTE: we take this is as a download failure to prevent counting the
|
||||||
|
// attempt as a chain failure, but simply a peer failure.
|
||||||
|
self.state.register_failure_downloading();
|
||||||
|
Err(LookupVerifyError::RootMismatch)
|
||||||
|
} else {
|
||||||
|
// Return the block for processing.
|
||||||
|
self.state.state = State::Processing { peer_id };
|
||||||
|
Ok(Some(block))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
if peer_id.should_have_block() {
|
||||||
|
self.state.register_failure_downloading();
|
||||||
|
Err(LookupVerifyError::NoBlockReturned)
|
||||||
|
} else {
|
||||||
|
self.state.state = State::AwaitingDownload;
|
||||||
|
Err(LookupVerifyError::BenignFailure)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_parent_root(verified_response: &Arc<SignedBeaconBlock<T::EthSpec>>) -> Option<Hash256> {
|
||||||
|
Some(verified_response.parent_root())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_to_child_components(
|
||||||
|
verified_response: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||||
|
components: &mut ChildComponents<T::EthSpec>,
|
||||||
|
) {
|
||||||
|
components.add_unknown_parent_block(verified_response);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verified_to_reconstructed(
|
||||||
|
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||||
|
) -> RpcBlock<T::EthSpec> {
|
||||||
|
RpcBlock::new_without_blobs(block)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_for_processing(
|
||||||
|
id: Id,
|
||||||
|
bl: &BlockLookups<T>,
|
||||||
|
block_root: Hash256,
|
||||||
|
constructed: RpcBlock<T::EthSpec>,
|
||||||
|
duration: Duration,
|
||||||
|
cx: &SyncNetworkContext<T>,
|
||||||
|
) -> Result<(), LookupRequestError> {
|
||||||
|
bl.send_block_for_processing(
|
||||||
|
block_root,
|
||||||
|
constructed,
|
||||||
|
duration,
|
||||||
|
BlockProcessType::SingleBlock { id },
|
||||||
|
cx,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn response_type() -> ResponseType {
|
||||||
|
ResponseType::Block
|
||||||
|
}
|
||||||
|
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self {
|
||||||
|
&mut request.block_request_state
|
||||||
|
}
|
||||||
|
fn get_state(&self) -> &SingleLookupRequestState {
|
||||||
|
&self.state
|
||||||
|
}
|
||||||
|
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState {
|
||||||
|
&mut self.state
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L, T::EthSpec> {
|
||||||
|
type RequestType = BlobsByRootRequest;
|
||||||
|
type ResponseType = Arc<BlobSidecar<T::EthSpec>>;
|
||||||
|
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
|
||||||
|
type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>;
|
||||||
|
|
||||||
|
fn new_request(&self) -> BlobsByRootRequest {
|
||||||
|
BlobsByRootRequest {
|
||||||
|
blob_ids: VariableList::from(self.requested_ids.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_request(
|
||||||
|
id: SingleLookupReqId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
request: Self::RequestType,
|
||||||
|
cx: &SyncNetworkContext<T>,
|
||||||
|
) -> Result<(), LookupRequestError> {
|
||||||
|
cx.blob_lookup_request(id, peer_id, request, L::lookup_type())
|
||||||
|
.map_err(LookupRequestError::SendFailed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verify_response_inner(
|
||||||
|
&mut self,
|
||||||
|
_expected_block_root: Hash256,
|
||||||
|
blob: Option<Self::ResponseType>,
|
||||||
|
peer_id: PeerShouldHave,
|
||||||
|
) -> Result<Option<FixedBlobSidecarList<T::EthSpec>>, LookupVerifyError> {
|
||||||
|
match blob {
|
||||||
|
Some(blob) => {
|
||||||
|
let received_id = blob.id();
|
||||||
|
if !self.requested_ids.contains(&received_id) {
|
||||||
|
self.state.register_failure_downloading();
|
||||||
|
Err(LookupVerifyError::UnrequestedBlobId)
|
||||||
|
} else {
|
||||||
|
// State should remain downloading until we receive the stream terminator.
|
||||||
|
self.requested_ids.retain(|id| *id != received_id);
|
||||||
|
let blob_index = blob.index;
|
||||||
|
|
||||||
|
if blob_index >= T::EthSpec::max_blobs_per_block() as u64 {
|
||||||
|
return Err(LookupVerifyError::InvalidIndex(blob.index));
|
||||||
|
}
|
||||||
|
*self.blob_download_queue.index_mut(blob_index as usize) = Some(blob);
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
self.state.state = State::Processing { peer_id };
|
||||||
|
let blobs = std::mem::take(&mut self.blob_download_queue);
|
||||||
|
Ok(Some(blobs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_parent_root(verified_response: &FixedBlobSidecarList<T::EthSpec>) -> Option<Hash256> {
|
||||||
|
verified_response
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|blob| blob.as_ref())
|
||||||
|
.map(|blob| blob.block_parent_root)
|
||||||
|
.next()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_to_child_components(
|
||||||
|
verified_response: FixedBlobSidecarList<T::EthSpec>,
|
||||||
|
components: &mut ChildComponents<T::EthSpec>,
|
||||||
|
) {
|
||||||
|
components.add_unknown_parent_blobs(verified_response);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verified_to_reconstructed(
|
||||||
|
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||||
|
) -> FixedBlobSidecarList<T::EthSpec> {
|
||||||
|
blobs
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_for_processing(
|
||||||
|
id: Id,
|
||||||
|
bl: &BlockLookups<T>,
|
||||||
|
block_root: Hash256,
|
||||||
|
verified: FixedBlobSidecarList<T::EthSpec>,
|
||||||
|
duration: Duration,
|
||||||
|
cx: &SyncNetworkContext<T>,
|
||||||
|
) -> Result<(), LookupRequestError> {
|
||||||
|
bl.send_blobs_for_processing(
|
||||||
|
block_root,
|
||||||
|
verified,
|
||||||
|
duration,
|
||||||
|
BlockProcessType::SingleBlob { id },
|
||||||
|
cx,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn response_type() -> ResponseType {
|
||||||
|
ResponseType::Blob
|
||||||
|
}
|
||||||
|
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self {
|
||||||
|
&mut request.blob_request_state
|
||||||
|
}
|
||||||
|
fn get_state(&self) -> &SingleLookupRequestState {
|
||||||
|
&self.state
|
||||||
|
}
|
||||||
|
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState {
|
||||||
|
&mut self.state
|
||||||
|
}
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -1,8 +1,7 @@
|
|||||||
use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup};
|
use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup};
|
||||||
use super::{DownloadedBlocks, PeerShouldHave};
|
use super::{DownloadedBlocks, PeerShouldHave};
|
||||||
use crate::sync::block_lookups::single_block_lookup::{
|
use crate::sync::block_lookups::common::Parent;
|
||||||
Parent, RequestState, State, UnknownParentComponents,
|
use crate::sync::block_lookups::common::RequestState;
|
||||||
};
|
|
||||||
use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext};
|
use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext};
|
||||||
use beacon_chain::block_verification_types::AsBlock;
|
use beacon_chain::block_verification_types::AsBlock;
|
||||||
use beacon_chain::block_verification_types::RpcBlock;
|
use beacon_chain::block_verification_types::RpcBlock;
|
||||||
@@ -55,7 +54,6 @@ pub enum RequestError {
|
|||||||
cannot_process: bool,
|
cannot_process: bool,
|
||||||
},
|
},
|
||||||
NoPeers,
|
NoPeers,
|
||||||
AlreadyDownloaded,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> ParentLookup<T> {
|
impl<T: BeaconChainTypes> ParentLookup<T> {
|
||||||
@@ -103,55 +101,42 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
|||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
|
pub fn check_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
|
||||||
self.current_parent_request
|
self.current_parent_request
|
||||||
.block_request_state
|
.block_request_state
|
||||||
.state
|
.state
|
||||||
.check_peer_disconnected(peer_id)
|
.check_peer_disconnected(peer_id)
|
||||||
}
|
.and_then(|()| {
|
||||||
|
self.current_parent_request
|
||||||
pub fn check_blob_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
|
.blob_request_state
|
||||||
self.current_parent_request
|
.state
|
||||||
.blob_request_state
|
.check_peer_disconnected(peer_id)
|
||||||
.state
|
})
|
||||||
.check_peer_disconnected(peer_id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_unknown_parent_block(&mut self, block: RpcBlock<T::EthSpec>) {
|
pub fn add_unknown_parent_block(&mut self, block: RpcBlock<T::EthSpec>) {
|
||||||
let next_parent = block.parent_root();
|
let next_parent = block.parent_root();
|
||||||
|
|
||||||
// Cache the block.
|
// Cache the block.
|
||||||
let current_root = self
|
let current_root = self.current_parent_request.block_root();
|
||||||
.current_parent_request
|
|
||||||
.block_request_state
|
|
||||||
.requested_block_root;
|
|
||||||
self.downloaded_blocks.push((current_root, block));
|
self.downloaded_blocks.push((current_root, block));
|
||||||
|
|
||||||
// Update the block request.
|
// Update the parent request.
|
||||||
self.current_parent_request
|
self.current_parent_request
|
||||||
.block_request_state
|
.update_requested_parent_block(next_parent)
|
||||||
.requested_block_root = next_parent;
|
|
||||||
self.current_parent_request.block_request_state.state.state = State::AwaitingDownload;
|
|
||||||
|
|
||||||
// Update the blobs request.
|
|
||||||
self.current_parent_request.blob_request_state.state.state = State::AwaitingDownload;
|
|
||||||
|
|
||||||
// Reset the unknown parent components.
|
|
||||||
self.current_parent_request.unknown_parent_components =
|
|
||||||
Some(UnknownParentComponents::default());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn processing_peer(&self) -> Result<PeerShouldHave, ()> {
|
pub fn block_processing_peer(&self) -> Result<PeerShouldHave, ()> {
|
||||||
self.current_parent_request
|
self.current_parent_request
|
||||||
.block_request_state
|
.block_request_state
|
||||||
.state
|
.state
|
||||||
.processing_peer()
|
.processing_peer()
|
||||||
.or_else(|()| {
|
}
|
||||||
self.current_parent_request
|
|
||||||
.blob_request_state
|
pub fn blob_processing_peer(&self) -> Result<PeerShouldHave, ()> {
|
||||||
.state
|
self.current_parent_request
|
||||||
.processing_peer()
|
.blob_request_state
|
||||||
})
|
.state
|
||||||
|
.processing_peer()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Consumes the parent request and destructures it into it's parts.
|
/// Consumes the parent request and destructures it into it's parts.
|
||||||
@@ -193,11 +178,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
|||||||
.blob_request_state
|
.blob_request_state
|
||||||
.state
|
.state
|
||||||
.register_failure_processing();
|
.register_failure_processing();
|
||||||
if let Some(components) = self
|
if let Some(components) = self.current_parent_request.cached_child_components.as_mut() {
|
||||||
.current_parent_request
|
|
||||||
.unknown_parent_components
|
|
||||||
.as_mut()
|
|
||||||
{
|
|
||||||
components.downloaded_block = None;
|
components.downloaded_block = None;
|
||||||
components.downloaded_blobs = <_>::default();
|
components.downloaded_blobs = <_>::default();
|
||||||
}
|
}
|
||||||
@@ -209,11 +190,8 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
block: Option<R::ResponseType>,
|
block: Option<R::ResponseType>,
|
||||||
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
|
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
|
||||||
) -> Result<Option<(Hash256, R::VerifiedResponseType)>, ParentVerifyError> {
|
) -> Result<Option<R::VerifiedResponseType>, ParentVerifyError> {
|
||||||
let expected_block_root = self
|
let expected_block_root = self.current_parent_request.block_root();
|
||||||
.current_parent_request
|
|
||||||
.block_request_state
|
|
||||||
.requested_block_root;
|
|
||||||
let request_state = R::request_state_mut(&mut self.current_parent_request);
|
let request_state = R::request_state_mut(&mut self.current_parent_request);
|
||||||
let root_and_block = request_state.verify_response(expected_block_root, block)?;
|
let root_and_block = request_state.verify_response(expected_block_root, block)?;
|
||||||
|
|
||||||
@@ -221,7 +199,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
|||||||
// be dropped and the peer downscored.
|
// be dropped and the peer downscored.
|
||||||
if let Some(parent_root) = root_and_block
|
if let Some(parent_root) = root_and_block
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|(_, block)| R::get_parent_root(block))
|
.and_then(|block| R::get_parent_root(block))
|
||||||
{
|
{
|
||||||
if failed_chains.contains(&parent_root) {
|
if failed_chains.contains(&parent_root) {
|
||||||
request_state.register_failure_downloading();
|
request_state.register_failure_downloading();
|
||||||
@@ -277,7 +255,6 @@ impl From<LookupRequestError> for RequestError {
|
|||||||
RequestError::TooManyAttempts { cannot_process }
|
RequestError::TooManyAttempts { cannot_process }
|
||||||
}
|
}
|
||||||
E::NoPeers => RequestError::NoPeers,
|
E::NoPeers => RequestError::NoPeers,
|
||||||
E::AlreadyDownloaded => RequestError::AlreadyDownloaded,
|
|
||||||
E::SendFailed(msg) => RequestError::SendFailed(msg),
|
E::SendFailed(msg) => RequestError::SendFailed(msg),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -306,7 +283,6 @@ impl RequestError {
|
|||||||
}
|
}
|
||||||
RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts",
|
RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts",
|
||||||
RequestError::NoPeers => "no_peers",
|
RequestError::NoPeers => "no_peers",
|
||||||
RequestError::AlreadyDownloaded => "already_downloaded",
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -1,12 +1,13 @@
|
|||||||
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
||||||
|
|
||||||
use crate::service::RequestId;
|
use crate::service::RequestId;
|
||||||
use crate::sync::manager::RequestId as SyncId;
|
use crate::sync::manager::{RequestId as SyncId, SingleLookupReqId};
|
||||||
use crate::NetworkMessage;
|
use crate::NetworkMessage;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
use crate::sync::block_lookups::common::ResponseType;
|
||||||
use beacon_chain::builder::Witness;
|
use beacon_chain::builder::Witness;
|
||||||
use beacon_chain::eth1_chain::CachingEth1Backend;
|
use beacon_chain::eth1_chain::CachingEth1Backend;
|
||||||
use beacon_chain::test_utils::{build_log, BeaconChainHarness, EphemeralHarnessType};
|
use beacon_chain::test_utils::{build_log, BeaconChainHarness, EphemeralHarnessType};
|
||||||
@@ -156,7 +157,7 @@ impl TestRig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[track_caller]
|
#[track_caller]
|
||||||
fn expect_block_request(&mut self, response_type: ResponseType) -> Id {
|
fn expect_block_request(&mut self, response_type: ResponseType) -> SingleLookupReqId {
|
||||||
match response_type {
|
match response_type {
|
||||||
ResponseType::Block => match self.network_rx.try_recv() {
|
ResponseType::Block => match self.network_rx.try_recv() {
|
||||||
Ok(NetworkMessage::SendRequest {
|
Ok(NetworkMessage::SendRequest {
|
||||||
@@ -182,7 +183,7 @@ impl TestRig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[track_caller]
|
#[track_caller]
|
||||||
fn expect_parent_request(&mut self, response_type: ResponseType) -> Id {
|
fn expect_parent_request(&mut self, response_type: ResponseType) -> SingleLookupReqId {
|
||||||
match response_type {
|
match response_type {
|
||||||
ResponseType::Block => match self.network_rx.try_recv() {
|
ResponseType::Block => match self.network_rx.try_recv() {
|
||||||
Ok(NetworkMessage::SendRequest {
|
Ok(NetworkMessage::SendRequest {
|
||||||
@@ -322,7 +323,7 @@ fn test_single_block_lookup_happy_path() {
|
|||||||
// after processing.
|
// after processing.
|
||||||
bl.single_lookup_response::<BlockRequestState<Current>>(id, peer_id, None, D, &cx);
|
bl.single_lookup_response::<BlockRequestState<Current>>(id, peer_id, None, D, &cx);
|
||||||
bl.single_block_component_processed::<BlockRequestState<Current>>(
|
bl.single_block_component_processed::<BlockRequestState<Current>>(
|
||||||
id,
|
id.id,
|
||||||
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
||||||
&mut cx,
|
&mut cx,
|
||||||
);
|
);
|
||||||
@@ -471,7 +472,7 @@ fn test_single_block_lookup_becomes_parent_request() {
|
|||||||
// Send the stream termination. Peer should have not been penalized, and the request moved to a
|
// Send the stream termination. Peer should have not been penalized, and the request moved to a
|
||||||
// parent request after processing.
|
// parent request after processing.
|
||||||
bl.single_block_component_processed::<BlockRequestState<Current>>(
|
bl.single_block_component_processed::<BlockRequestState<Current>>(
|
||||||
id,
|
id.id,
|
||||||
BlockError::ParentUnknown(block.into()).into(),
|
BlockError::ParentUnknown(block.into()).into(),
|
||||||
&mut cx,
|
&mut cx,
|
||||||
);
|
);
|
||||||
@@ -766,6 +767,14 @@ fn test_parent_lookup_too_many_attempts() {
|
|||||||
&cx,
|
&cx,
|
||||||
);
|
);
|
||||||
// Send the stream termination
|
// Send the stream termination
|
||||||
|
|
||||||
|
// Note, previously we would send the same lookup id with a stream terminator,
|
||||||
|
// we'd ignore it because we'd intrepret it as an unrequested response, since
|
||||||
|
// we already got one response for the block. I'm not sure what the intent is
|
||||||
|
// for having this stream terminator line in this test at all. Receiving an invalid
|
||||||
|
// block and a stream terminator with the same Id now results in two failed attempts,
|
||||||
|
// I'm unsure if this is how it should behave?
|
||||||
|
//
|
||||||
bl.parent_lookup_response::<BlockRequestState<Parent>>(id, peer_id, None, D, &cx);
|
bl.parent_lookup_response::<BlockRequestState<Parent>>(id, peer_id, None, D, &cx);
|
||||||
rig.expect_penalty();
|
rig.expect_penalty();
|
||||||
}
|
}
|
||||||
@@ -1051,7 +1060,7 @@ fn test_single_block_lookup_ignored_response() {
|
|||||||
bl.single_lookup_response::<BlockRequestState<Current>>(id, peer_id, None, D, &cx);
|
bl.single_lookup_response::<BlockRequestState<Current>>(id, peer_id, None, D, &cx);
|
||||||
// Send an Ignored response, the request should be dropped
|
// Send an Ignored response, the request should be dropped
|
||||||
bl.single_block_component_processed::<BlockRequestState<Current>>(
|
bl.single_block_component_processed::<BlockRequestState<Current>>(
|
||||||
id,
|
id.id,
|
||||||
BlockProcessingResult::Ignored,
|
BlockProcessingResult::Ignored,
|
||||||
&mut cx,
|
&mut cx,
|
||||||
);
|
);
|
||||||
@@ -1216,6 +1225,7 @@ fn test_same_chain_race_condition() {
|
|||||||
|
|
||||||
mod deneb_only {
|
mod deneb_only {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::sync::block_lookups::common::ResponseType;
|
||||||
use beacon_chain::blob_verification::BlobError;
|
use beacon_chain::blob_verification::BlobError;
|
||||||
use std::ops::IndexMut;
|
use std::ops::IndexMut;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
@@ -1229,10 +1239,10 @@ mod deneb_only {
|
|||||||
parent_block: Option<Arc<SignedBeaconBlock<E>>>,
|
parent_block: Option<Arc<SignedBeaconBlock<E>>>,
|
||||||
parent_blobs: Vec<Arc<BlobSidecar<E>>>,
|
parent_blobs: Vec<Arc<BlobSidecar<E>>>,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
block_req_id: Option<u32>,
|
block_req_id: Option<SingleLookupReqId>,
|
||||||
parent_block_req_id: Option<u32>,
|
parent_block_req_id: Option<SingleLookupReqId>,
|
||||||
blob_req_id: Option<u32>,
|
blob_req_id: Option<SingleLookupReqId>,
|
||||||
parent_blob_req_id: Option<u32>,
|
parent_blob_req_id: Option<SingleLookupReqId>,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
}
|
}
|
||||||
@@ -1296,7 +1306,7 @@ mod deneb_only {
|
|||||||
block_root = child_root;
|
block_root = child_root;
|
||||||
bl.search_child_block(
|
bl.search_child_block(
|
||||||
child_root,
|
child_root,
|
||||||
Some(UnknownParentComponents::new(Some(child_block), None)),
|
Some(ChildComponents::new(Some(child_block), None)),
|
||||||
&[PeerShouldHave::Neither(peer_id)],
|
&[PeerShouldHave::Neither(peer_id)],
|
||||||
&mut cx,
|
&mut cx,
|
||||||
);
|
);
|
||||||
@@ -1334,7 +1344,7 @@ mod deneb_only {
|
|||||||
*blobs.index_mut(0) = Some(child_blob);
|
*blobs.index_mut(0) = Some(child_blob);
|
||||||
bl.search_child_block(
|
bl.search_child_block(
|
||||||
child_root,
|
child_root,
|
||||||
Some(UnknownParentComponents::new(None, Some(blobs))),
|
Some(ChildComponents::new(None, Some(blobs))),
|
||||||
&[PeerShouldHave::Neither(peer_id)],
|
&[PeerShouldHave::Neither(peer_id)],
|
||||||
&mut cx,
|
&mut cx,
|
||||||
);
|
);
|
||||||
@@ -1531,7 +1541,7 @@ mod deneb_only {
|
|||||||
// mean we do not send a new request.
|
// mean we do not send a new request.
|
||||||
self.bl
|
self.bl
|
||||||
.single_block_component_processed::<BlockRequestState<Current>>(
|
.single_block_component_processed::<BlockRequestState<Current>>(
|
||||||
self.block_req_id.expect("block request id"),
|
self.block_req_id.expect("block request id").id,
|
||||||
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(
|
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(
|
||||||
self.block_root,
|
self.block_root,
|
||||||
)),
|
)),
|
||||||
@@ -1578,7 +1588,7 @@ mod deneb_only {
|
|||||||
fn invalid_block_processed(mut self) -> Self {
|
fn invalid_block_processed(mut self) -> Self {
|
||||||
self.bl
|
self.bl
|
||||||
.single_block_component_processed::<BlockRequestState<Current>>(
|
.single_block_component_processed::<BlockRequestState<Current>>(
|
||||||
self.block_req_id.expect("block request id"),
|
self.block_req_id.expect("block request id").id,
|
||||||
BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid),
|
BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid),
|
||||||
&mut self.cx,
|
&mut self.cx,
|
||||||
);
|
);
|
||||||
@@ -1589,7 +1599,7 @@ mod deneb_only {
|
|||||||
fn invalid_blob_processed(mut self) -> Self {
|
fn invalid_blob_processed(mut self) -> Self {
|
||||||
self.bl
|
self.bl
|
||||||
.single_block_component_processed::<BlobRequestState<Current, E>>(
|
.single_block_component_processed::<BlobRequestState<Current, E>>(
|
||||||
self.blob_req_id.expect("blob request id"),
|
self.blob_req_id.expect("blob request id").id,
|
||||||
BlockProcessingResult::Err(BlockError::BlobValidation(
|
BlockProcessingResult::Err(BlockError::BlobValidation(
|
||||||
BlobError::ProposerSignatureInvalid,
|
BlobError::ProposerSignatureInvalid,
|
||||||
)),
|
)),
|
||||||
@@ -1602,7 +1612,7 @@ mod deneb_only {
|
|||||||
fn missing_components_from_block_request(mut self) -> Self {
|
fn missing_components_from_block_request(mut self) -> Self {
|
||||||
self.bl
|
self.bl
|
||||||
.single_block_component_processed::<BlockRequestState<Current>>(
|
.single_block_component_processed::<BlockRequestState<Current>>(
|
||||||
self.block_req_id.expect("block request id"),
|
self.block_req_id.expect("block request id").id,
|
||||||
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
|
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
|
||||||
self.slot,
|
self.slot,
|
||||||
self.block_root,
|
self.block_root,
|
||||||
@@ -1616,7 +1626,7 @@ mod deneb_only {
|
|||||||
fn missing_components_from_blob_request(mut self) -> Self {
|
fn missing_components_from_blob_request(mut self) -> Self {
|
||||||
self.bl
|
self.bl
|
||||||
.single_block_component_processed::<BlobRequestState<Current, E>>(
|
.single_block_component_processed::<BlobRequestState<Current, E>>(
|
||||||
self.blob_req_id.expect("blob request id"),
|
self.blob_req_id.expect("blob request id").id,
|
||||||
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
|
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
|
||||||
self.slot,
|
self.slot,
|
||||||
self.block_root,
|
self.block_root,
|
||||||
|
|||||||
@@ -41,11 +41,10 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
|
|||||||
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
|
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
|
||||||
use crate::service::NetworkMessage;
|
use crate::service::NetworkMessage;
|
||||||
use crate::status::ToStatusMessage;
|
use crate::status::ToStatusMessage;
|
||||||
|
use crate::sync::block_lookups::common::{Current, Parent};
|
||||||
use crate::sync::block_lookups::delayed_lookup;
|
use crate::sync::block_lookups::delayed_lookup;
|
||||||
use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage;
|
use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage;
|
||||||
use crate::sync::block_lookups::{
|
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, ChildComponents};
|
||||||
BlobRequestState, BlockRequestState, Current, Parent, UnknownParentComponents,
|
|
||||||
};
|
|
||||||
use crate::sync::range_sync::ByRangeRequestType;
|
use crate::sync::range_sync::ByRangeRequestType;
|
||||||
use beacon_chain::block_verification_types::AsBlock;
|
use beacon_chain::block_verification_types::AsBlock;
|
||||||
use beacon_chain::block_verification_types::RpcBlock;
|
use beacon_chain::block_verification_types::RpcBlock;
|
||||||
@@ -84,45 +83,39 @@ pub const DELAY_QUEUE_CHANNEL_SIZE: usize = 128;
|
|||||||
|
|
||||||
pub type Id = u32;
|
pub type Id = u32;
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
|
||||||
pub enum ResponseType {
|
pub struct SingleLookupReqId {
|
||||||
Block,
|
pub id: Id,
|
||||||
Blob,
|
pub req_counter: Id,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
|
||||||
|
pub struct ParentLookupReqId {
|
||||||
|
pub id: Id,
|
||||||
|
pub req_counter: Id,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Id of rpc requests sent by sync to the network.
|
/// Id of rpc requests sent by sync to the network.
|
||||||
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
|
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
|
||||||
pub enum RequestId {
|
pub enum RequestId {
|
||||||
/// Request searching for a block given a hash.
|
/// Request searching for a block given a hash.
|
||||||
SingleBlock {
|
SingleBlock { id: SingleLookupReqId },
|
||||||
id: Id,
|
/// Request searching for a set of blobs given a hash.
|
||||||
},
|
SingleBlob { id: SingleLookupReqId },
|
||||||
SingleBlob {
|
/// Request searching for a block's parent. The id is the chain, share with the corresponding
|
||||||
id: Id,
|
/// blob id.
|
||||||
},
|
ParentLookup { id: SingleLookupReqId },
|
||||||
/// Request searching for a block's parent. The id is the chain
|
/// Request searching for a block's parent blobs. The id is the chain, shared with the corresponding
|
||||||
ParentLookup {
|
/// block id.
|
||||||
id: Id,
|
ParentLookupBlob { id: SingleLookupReqId },
|
||||||
},
|
|
||||||
ParentLookupBlob {
|
|
||||||
id: Id,
|
|
||||||
},
|
|
||||||
/// Request was from the backfill sync algorithm.
|
/// Request was from the backfill sync algorithm.
|
||||||
BackFillBlocks {
|
BackFillBlocks { id: Id },
|
||||||
id: Id,
|
|
||||||
},
|
|
||||||
/// Backfill request that is composed by both a block range request and a blob range request.
|
/// Backfill request that is composed by both a block range request and a blob range request.
|
||||||
BackFillBlockAndBlobs {
|
BackFillBlockAndBlobs { id: Id },
|
||||||
id: Id,
|
|
||||||
},
|
|
||||||
/// The request was from a chain in the range sync algorithm.
|
/// The request was from a chain in the range sync algorithm.
|
||||||
RangeBlocks {
|
RangeBlocks { id: Id },
|
||||||
id: Id,
|
|
||||||
},
|
|
||||||
/// Range request that is composed by both a block range request and a blob range request.
|
/// Range request that is composed by both a block range request and a blob range request.
|
||||||
RangeBlockAndBlobs {
|
RangeBlockAndBlobs { id: Id },
|
||||||
id: Id,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -680,7 +673,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
block_root,
|
block_root,
|
||||||
parent_root,
|
parent_root,
|
||||||
blob_slot,
|
blob_slot,
|
||||||
Some(UnknownParentComponents::new(None, Some(blobs))),
|
Some(ChildComponents::new(None, Some(blobs))),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
|
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
|
||||||
@@ -718,16 +711,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SyncMessage::MissingGossipBlockComponentsDelayed(block_root) => {
|
SyncMessage::MissingGossipBlockComponentsDelayed(block_root) => self
|
||||||
if self
|
.block_lookups
|
||||||
.block_lookups
|
.trigger_lookup_by_root(block_root, &self.network),
|
||||||
.trigger_lookup_by_root(block_root, &self.network)
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
// No request was made for block or blob so the lookup is dropped.
|
|
||||||
self.block_lookups.remove_lookup_by_root(block_root);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SyncMessage::Disconnect(peer_id) => {
|
SyncMessage::Disconnect(peer_id) => {
|
||||||
self.peer_disconnect(&peer_id);
|
self.peer_disconnect(&peer_id);
|
||||||
}
|
}
|
||||||
@@ -796,7 +782,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
parent_root: Hash256,
|
parent_root: Hash256,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
parent_components: Option<UnknownParentComponents<T::EthSpec>>,
|
parent_components: Option<ChildComponents<T::EthSpec>>,
|
||||||
) {
|
) {
|
||||||
if self.should_search_for_block(slot, &peer_id) {
|
if self.should_search_for_block(slot, &peer_id) {
|
||||||
self.block_lookups.search_parent(
|
self.block_lookups.search_parent(
|
||||||
@@ -951,7 +937,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
seen_timestamp,
|
seen_timestamp,
|
||||||
&self.network,
|
&self.network,
|
||||||
),
|
),
|
||||||
RequestId::SingleBlob { id: _ } => {
|
RequestId::SingleBlob { .. } => {
|
||||||
crit!(self.log, "Blob received during block request"; "peer_id" => %peer_id );
|
crit!(self.log, "Blob received during block request"; "peer_id" => %peer_id );
|
||||||
}
|
}
|
||||||
RequestId::ParentLookup { id } => self
|
RequestId::ParentLookup { id } => self
|
||||||
@@ -1023,7 +1009,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
seen_timestamp: Duration,
|
seen_timestamp: Duration,
|
||||||
) {
|
) {
|
||||||
match request_id {
|
match request_id {
|
||||||
RequestId::SingleBlock { id: _ } => {
|
RequestId::SingleBlock { .. } => {
|
||||||
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
|
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
|
||||||
}
|
}
|
||||||
RequestId::SingleBlob { id } => self
|
RequestId::SingleBlob { id } => self
|
||||||
|
|||||||
@@ -9,6 +9,6 @@ mod network_context;
|
|||||||
mod peer_sync_info;
|
mod peer_sync_info;
|
||||||
mod range_sync;
|
mod range_sync;
|
||||||
|
|
||||||
pub use block_lookups::UnknownParentComponents;
|
pub use block_lookups::ChildComponents;
|
||||||
pub use manager::{BatchProcessResult, SyncMessage};
|
pub use manager::{BatchProcessResult, SyncMessage};
|
||||||
pub use range_sync::{BatchOperationOutcome, ChainId};
|
pub use range_sync::{BatchOperationOutcome, ChainId};
|
||||||
|
|||||||
@@ -7,7 +7,8 @@ use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
|
|||||||
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
use crate::network_beacon_processor::NetworkBeaconProcessor;
|
||||||
use crate::service::{NetworkMessage, RequestId};
|
use crate::service::{NetworkMessage, RequestId};
|
||||||
use crate::status::ToStatusMessage;
|
use crate::status::ToStatusMessage;
|
||||||
use crate::sync::block_lookups::LookupType;
|
use crate::sync::block_lookups::common::LookupType;
|
||||||
|
use crate::sync::manager::SingleLookupReqId;
|
||||||
use beacon_chain::block_verification_types::RpcBlock;
|
use beacon_chain::block_verification_types::RpcBlock;
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
|
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
@@ -404,23 +405,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a blocks by root request for a parent request.
|
pub fn block_lookup_request(
|
||||||
pub fn single_lookup_request(
|
|
||||||
&self,
|
&self,
|
||||||
id: Id,
|
id: SingleLookupReqId,
|
||||||
peer_id: PeerId,
|
|
||||||
request: BlocksByRootRequest,
|
|
||||||
blob_peer_id: PeerId,
|
|
||||||
blob_request: BlobsByRootRequest,
|
|
||||||
lookup_type: LookupType,
|
|
||||||
) -> Result<(), &'static str> {
|
|
||||||
self.single_block_lookup_request_retry(id, peer_id, request, lookup_type)?;
|
|
||||||
self.single_blob_lookup_request_retry(id, blob_peer_id, blob_request, lookup_type)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
pub fn single_block_lookup_request_retry(
|
|
||||||
&self,
|
|
||||||
id: Id,
|
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
request: BlocksByRootRequest,
|
request: BlocksByRootRequest,
|
||||||
lookup_type: LookupType,
|
lookup_type: LookupType,
|
||||||
@@ -448,14 +435,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn single_blob_lookup_request_retry(
|
pub fn blob_lookup_request(
|
||||||
&self,
|
&self,
|
||||||
id: Id,
|
id: SingleLookupReqId,
|
||||||
blob_peer_id: PeerId,
|
blob_peer_id: PeerId,
|
||||||
blob_request: BlobsByRootRequest,
|
blob_request: BlobsByRootRequest,
|
||||||
lookup_type: LookupType,
|
lookup_type: LookupType,
|
||||||
) -> Result<(), &'static str> {
|
) -> Result<(), &'static str> {
|
||||||
let request_id = RequestId::Sync(SyncRequestId::SingleBlob { id });
|
let sync_id = match lookup_type {
|
||||||
|
LookupType::Current => SyncRequestId::SingleBlock { id },
|
||||||
|
LookupType::Parent => SyncRequestId::ParentLookup { id },
|
||||||
|
};
|
||||||
|
let request_id = RequestId::Sync(sync_id);
|
||||||
|
|
||||||
if !blob_request.blob_ids.is_empty() {
|
if !blob_request.blob_ids.is_empty() {
|
||||||
trace!(
|
trace!(
|
||||||
@@ -558,6 +549,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
"To deal with alignment with deneb boundaries, batches need to be of just one epoch"
|
"To deal with alignment with deneb boundaries, batches need to be of just one epoch"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
{
|
||||||
|
// Keep tests only for blocks.
|
||||||
|
ByRangeRequestType::Blocks
|
||||||
|
}
|
||||||
|
#[cfg(not(test))]
|
||||||
if let Some(data_availability_boundary) = self.chain.data_availability_boundary() {
|
if let Some(data_availability_boundary) = self.chain.data_availability_boundary() {
|
||||||
if epoch >= data_availability_boundary {
|
if epoch >= data_availability_boundary {
|
||||||
ByRangeRequestType::BlocksAndBlobs
|
ByRangeRequestType::BlocksAndBlobs
|
||||||
|
|||||||
Reference in New Issue
Block a user