mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-04 21:34:36 +00:00
Add EnvelopeRequestState logic
This commit is contained in:
@@ -2,7 +2,7 @@ use crate::sync::block_lookups::single_block_lookup::{
|
|||||||
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
|
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
|
||||||
};
|
};
|
||||||
use crate::sync::block_lookups::{
|
use crate::sync::block_lookups::{
|
||||||
BlobRequestState, BlockRequestState, CustodyRequestState, PeerId,
|
BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, PeerId,
|
||||||
};
|
};
|
||||||
use crate::sync::manager::BlockProcessType;
|
use crate::sync::manager::BlockProcessType;
|
||||||
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
|
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
|
||||||
@@ -12,16 +12,17 @@ use parking_lot::RwLock;
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use types::data::FixedBlobSidecarList;
|
use types::data::FixedBlobSidecarList;
|
||||||
use types::{DataColumnSidecarList, SignedBeaconBlock};
|
use types::{DataColumnSidecarList, SignedBeaconBlock, SignedExecutionPayloadEnvelope};
|
||||||
|
|
||||||
use super::SingleLookupId;
|
use super::SingleLookupId;
|
||||||
use super::single_block_lookup::{ComponentRequests, DownloadResult};
|
use super::single_block_lookup::{ComponentRequests, DownloadResult};
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
pub enum ResponseType {
|
pub enum ResponseType {
|
||||||
Block,
|
Block,
|
||||||
Blob,
|
Blob,
|
||||||
CustodyColumn,
|
CustodyColumn,
|
||||||
|
Envelope,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This trait unifies common single block lookup functionality across blocks and blobs. This
|
/// This trait unifies common single block lookup functionality across blocks and blobs. This
|
||||||
@@ -151,6 +152,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
|
|||||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||||
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
|
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
|
||||||
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
|
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"),
|
||||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -205,6 +207,7 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
|||||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||||
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
|
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
|
||||||
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
|
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"),
|
||||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -215,3 +218,52 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
|||||||
&mut self.state
|
&mut self.state
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T: BeaconChainTypes> RequestState<T> for EnvelopeRequestState<T::EthSpec> {
|
||||||
|
type VerifiedResponseType = Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>;
|
||||||
|
|
||||||
|
fn make_request(
|
||||||
|
&self,
|
||||||
|
id: Id,
|
||||||
|
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||||
|
_: usize,
|
||||||
|
cx: &mut SyncNetworkContext<T>,
|
||||||
|
) -> Result<LookupRequestResult, LookupRequestError> {
|
||||||
|
cx.envelope_lookup_request(id, lookup_peers, self.block_root)
|
||||||
|
.map_err(LookupRequestError::SendFailedNetwork)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_for_processing(
|
||||||
|
id: Id,
|
||||||
|
download_result: DownloadResult<Self::VerifiedResponseType>,
|
||||||
|
cx: &SyncNetworkContext<T>,
|
||||||
|
) -> Result<(), LookupRequestError> {
|
||||||
|
let DownloadResult {
|
||||||
|
value,
|
||||||
|
block_root,
|
||||||
|
seen_timestamp,
|
||||||
|
..
|
||||||
|
} = download_result;
|
||||||
|
cx.send_envelope_for_processing(id, value, seen_timestamp, block_root)
|
||||||
|
.map_err(LookupRequestError::SendFailedProcessor)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn response_type() -> ResponseType {
|
||||||
|
ResponseType::Envelope
|
||||||
|
}
|
||||||
|
|
||||||
|
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||||
|
match &mut request.component_requests {
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(request) => Ok(request),
|
||||||
|
_ => Err("expecting envelope request"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||||
|
&self.state
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||||
|
&mut self.state
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -39,7 +39,9 @@ use fnv::FnvHashMap;
|
|||||||
use lighthouse_network::service::api_types::SingleLookupReqId;
|
use lighthouse_network::service::api_types::SingleLookupReqId;
|
||||||
use lighthouse_network::{PeerAction, PeerId};
|
use lighthouse_network::{PeerAction, PeerId};
|
||||||
use lru_cache::LRUTimeCache;
|
use lru_cache::LRUTimeCache;
|
||||||
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState};
|
pub use single_block_lookup::{
|
||||||
|
BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState,
|
||||||
|
};
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -344,6 +346,57 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
self.new_current_lookup(block_root_to_search, None, None, peers, cx)
|
self.new_current_lookup(block_root_to_search, None, None, peers, cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A block triggers the search of a parent envelope.
|
||||||
|
#[must_use = "only reference the new lookup if returns true"]
|
||||||
|
pub fn search_parent_envelope_of_child(
|
||||||
|
&mut self,
|
||||||
|
parent_root: Hash256,
|
||||||
|
peers: &[PeerId],
|
||||||
|
cx: &mut SyncNetworkContext<T>,
|
||||||
|
) -> bool {
|
||||||
|
// Check if there's already a lookup for this root (could be a block lookup or envelope
|
||||||
|
// lookup). If so, add peers and let it handle the envelope.
|
||||||
|
if let Some((&lookup_id, _lookup)) = self
|
||||||
|
.single_block_lookups
|
||||||
|
.iter_mut()
|
||||||
|
.find(|(_, lookup)| lookup.is_for_block(parent_root))
|
||||||
|
{
|
||||||
|
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) {
|
||||||
|
warn!(error = ?e, "Error adding peers to envelope lookup");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.single_block_lookups.len() >= MAX_LOOKUPS {
|
||||||
|
warn!(?parent_root, "Dropping envelope lookup reached max");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let lookup = SingleBlockLookup::new_envelope_only(parent_root, peers, cx.next_id());
|
||||||
|
let _guard = lookup.span.clone().entered();
|
||||||
|
|
||||||
|
let id = lookup.id;
|
||||||
|
let lookup = match self.single_block_lookups.entry(id) {
|
||||||
|
Entry::Vacant(entry) => entry.insert(lookup),
|
||||||
|
Entry::Occupied(_) => {
|
||||||
|
warn!(id, "Lookup exists with same id");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
?peers,
|
||||||
|
?parent_root,
|
||||||
|
id = lookup.id,
|
||||||
|
"Created envelope-only lookup"
|
||||||
|
);
|
||||||
|
metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED);
|
||||||
|
self.metrics.created_lookups += 1;
|
||||||
|
|
||||||
|
let result = lookup.continue_requests(cx);
|
||||||
|
self.on_lookup_result(id, result, "new_envelope_lookup", cx)
|
||||||
|
}
|
||||||
|
|
||||||
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
|
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
|
||||||
/// constructed.
|
/// constructed.
|
||||||
/// Returns true if the lookup is created or already exists
|
/// Returns true if the lookup is created or already exists
|
||||||
@@ -561,17 +614,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
|
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
|
||||||
}
|
}
|
||||||
BlockProcessType::SinglePayloadEnvelope { id, block_root } => {
|
BlockProcessType::SinglePayloadEnvelope { id, block_root } => {
|
||||||
match result {
|
let result = self
|
||||||
BlockProcessingResult::Ok(_) => {
|
.on_processing_result_inner::<EnvelopeRequestState<T::EthSpec>>(id, result, cx);
|
||||||
self.continue_envelope_child_lookups(block_root, cx);
|
// On successful envelope import, unblock child lookups waiting for this envelope
|
||||||
}
|
if matches!(&result, Ok(LookupResult::Completed)) {
|
||||||
BlockProcessingResult::Err(e) => {
|
self.continue_envelope_child_lookups(block_root, cx);
|
||||||
debug!(%id, error = ?e, "Payload envelope processing failed");
|
|
||||||
// TODO(EIP-7732): resolve awaiting_envelope on affected lookups so they can retry
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
}
|
||||||
return;
|
result
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
|
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
|
||||||
@@ -721,6 +770,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
ResponseType::CustodyColumn => {
|
ResponseType::CustodyColumn => {
|
||||||
"lookup_custody_column_processing_failure"
|
"lookup_custody_column_processing_failure"
|
||||||
}
|
}
|
||||||
|
ResponseType::Envelope => "lookup_envelope_processing_failure",
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -764,22 +814,20 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
}
|
}
|
||||||
Action::ParentEnvelopeUnknown { parent_root } => {
|
Action::ParentEnvelopeUnknown { parent_root } => {
|
||||||
let peers = lookup.all_peers();
|
let peers = lookup.all_peers();
|
||||||
lookup.set_awaiting_envelope(parent_root);
|
lookup.set_awaiting_parent_envelope(parent_root);
|
||||||
// Pick a peer to request the envelope from
|
let envelope_lookup_exists = self.search_parent_envelope_of_child(parent_root, &peers, cx);
|
||||||
let peer_id = peers.first().copied().ok_or_else(|| {
|
if envelope_lookup_exists {
|
||||||
LookupRequestError::Failed("No peers available for envelope request".to_owned())
|
debug!(
|
||||||
})?;
|
id = lookup_id,
|
||||||
match cx.envelope_lookup_request(lookup_id, peer_id, parent_root) {
|
?block_root,
|
||||||
Ok(_) => {
|
?parent_root,
|
||||||
debug!(
|
"Marking lookup as awaiting parent envelope"
|
||||||
id = lookup_id,
|
);
|
||||||
?block_root,
|
Ok(LookupResult::Pending)
|
||||||
?parent_root,
|
} else {
|
||||||
"Requesting missing parent envelope"
|
Err(LookupRequestError::Failed(format!(
|
||||||
);
|
"Envelope lookup could not be created for {parent_root:?}"
|
||||||
Ok(LookupResult::Pending)
|
)))
|
||||||
}
|
|
||||||
Err(e) => Err(LookupRequestError::SendFailedNetwork(e)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Action::Drop(reason) => {
|
Action::Drop(reason) => {
|
||||||
@@ -858,8 +906,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
let mut lookup_results = vec![];
|
let mut lookup_results = vec![];
|
||||||
|
|
||||||
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
||||||
if lookup.awaiting_envelope() == Some(block_root) {
|
if lookup.awaiting_parent_envelope() == Some(block_root) {
|
||||||
lookup.resolve_awaiting_envelope();
|
lookup.resolve_awaiting_parent_envelope();
|
||||||
debug!(
|
debug!(
|
||||||
envelope_root = ?block_root,
|
envelope_root = ?block_root,
|
||||||
id,
|
id,
|
||||||
@@ -894,7 +942,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
let child_lookups = self
|
let child_lookups = self
|
||||||
.single_block_lookups
|
.single_block_lookups
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root()))
|
.filter(|(_, lookup)| {
|
||||||
|
lookup.awaiting_parent() == Some(dropped_lookup.block_root())
|
||||||
|
|| lookup.awaiting_parent_envelope() == Some(dropped_lookup.block_root())
|
||||||
|
})
|
||||||
.map(|(id, _)| *id)
|
.map(|(id, _)| *id)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,9 @@ use store::Hash256;
|
|||||||
use strum::IntoStaticStr;
|
use strum::IntoStaticStr;
|
||||||
use tracing::{Span, debug_span};
|
use tracing::{Span, debug_span};
|
||||||
use types::data::FixedBlobSidecarList;
|
use types::data::FixedBlobSidecarList;
|
||||||
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};
|
use types::{
|
||||||
|
DataColumnSidecarList, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||||
|
};
|
||||||
|
|
||||||
// Dedicated enum for LookupResult to force its usage
|
// Dedicated enum for LookupResult to force its usage
|
||||||
#[must_use = "LookupResult must be handled with on_lookup_result"]
|
#[must_use = "LookupResult must be handled with on_lookup_result"]
|
||||||
@@ -70,7 +72,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
|||||||
peers: Arc<RwLock<HashSet<PeerId>>>,
|
peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
awaiting_parent: Option<Hash256>,
|
awaiting_parent: Option<Hash256>,
|
||||||
awaiting_envelope: Option<Hash256>,
|
awaiting_parent_envelope: Option<Hash256>,
|
||||||
created: Instant,
|
created: Instant,
|
||||||
pub(crate) span: Span,
|
pub(crate) span: Span,
|
||||||
}
|
}
|
||||||
@@ -80,6 +82,7 @@ pub(crate) enum ComponentRequests<E: EthSpec> {
|
|||||||
WaitingForBlock,
|
WaitingForBlock,
|
||||||
ActiveBlobRequest(BlobRequestState<E>, usize),
|
ActiveBlobRequest(BlobRequestState<E>, usize),
|
||||||
ActiveCustodyRequest(CustodyRequestState<E>),
|
ActiveCustodyRequest(CustodyRequestState<E>),
|
||||||
|
ActiveEnvelopeRequest(EnvelopeRequestState<E>),
|
||||||
// When printing in debug this state display the reason why it's not needed
|
// When printing in debug this state display the reason why it's not needed
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
NotNeeded(&'static str),
|
NotNeeded(&'static str),
|
||||||
@@ -105,12 +108,26 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
|
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
|
||||||
block_root: requested_block_root,
|
block_root: requested_block_root,
|
||||||
awaiting_parent,
|
awaiting_parent,
|
||||||
awaiting_envelope: None,
|
awaiting_parent_envelope: None,
|
||||||
created: Instant::now(),
|
created: Instant::now(),
|
||||||
span: lookup_span,
|
span: lookup_span,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create an envelope-only lookup. The block is already imported, we just need the envelope.
|
||||||
|
pub fn new_envelope_only(block_root: Hash256, peers: &[PeerId], id: Id) -> Self {
|
||||||
|
let mut lookup = Self::new(block_root, peers, id, None);
|
||||||
|
// Block is already imported, mark as completed
|
||||||
|
lookup
|
||||||
|
.block_request_state
|
||||||
|
.state
|
||||||
|
.on_completed_request("block already imported")
|
||||||
|
.expect("block state starts as AwaitingDownload");
|
||||||
|
lookup.component_requests =
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(EnvelopeRequestState::new(block_root));
|
||||||
|
lookup
|
||||||
|
}
|
||||||
|
|
||||||
/// Reset the status of all internal requests
|
/// Reset the status of all internal requests
|
||||||
pub fn reset_requests(&mut self) {
|
pub fn reset_requests(&mut self) {
|
||||||
self.block_request_state = BlockRequestState::new(self.block_root);
|
self.block_request_state = BlockRequestState::new(self.block_root);
|
||||||
@@ -146,18 +163,18 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
self.awaiting_parent = None;
|
self.awaiting_parent = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn awaiting_envelope(&self) -> Option<Hash256> {
|
pub fn awaiting_parent_envelope(&self) -> Option<Hash256> {
|
||||||
self.awaiting_envelope
|
self.awaiting_parent_envelope
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark this lookup as awaiting a parent envelope to be imported before processing.
|
/// Mark this lookup as awaiting a parent envelope to be imported before processing.
|
||||||
pub fn set_awaiting_envelope(&mut self, parent_root: Hash256) {
|
pub fn set_awaiting_parent_envelope(&mut self, parent_root: Hash256) {
|
||||||
self.awaiting_envelope = Some(parent_root);
|
self.awaiting_parent_envelope = Some(parent_root);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark this lookup as no longer awaiting a parent envelope.
|
/// Mark this lookup as no longer awaiting a parent envelope.
|
||||||
pub fn resolve_awaiting_envelope(&mut self) {
|
pub fn resolve_awaiting_parent_envelope(&mut self) {
|
||||||
self.awaiting_envelope = None;
|
self.awaiting_parent_envelope = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the time elapsed since this lookup was created
|
/// Returns the time elapsed since this lookup was created
|
||||||
@@ -194,6 +211,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
ComponentRequests::WaitingForBlock => false,
|
ComponentRequests::WaitingForBlock => false,
|
||||||
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
|
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
|
||||||
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
|
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(request) => request.state.is_processed(),
|
||||||
ComponentRequests::NotNeeded { .. } => true,
|
ComponentRequests::NotNeeded { .. } => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -201,7 +219,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
/// Returns true if this request is expecting some event to make progress
|
/// Returns true if this request is expecting some event to make progress
|
||||||
pub fn is_awaiting_event(&self) -> bool {
|
pub fn is_awaiting_event(&self) -> bool {
|
||||||
self.awaiting_parent.is_some()
|
self.awaiting_parent.is_some()
|
||||||
|| self.awaiting_envelope.is_some()
|
|| self.awaiting_parent_envelope.is_some()
|
||||||
|| self.block_request_state.state.is_awaiting_event()
|
|| self.block_request_state.state.is_awaiting_event()
|
||||||
|| match &self.component_requests {
|
|| match &self.component_requests {
|
||||||
// If components are waiting for the block request to complete, here we should
|
// If components are waiting for the block request to complete, here we should
|
||||||
@@ -214,6 +232,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
ComponentRequests::ActiveCustodyRequest(request) => {
|
ComponentRequests::ActiveCustodyRequest(request) => {
|
||||||
request.state.is_awaiting_event()
|
request.state.is_awaiting_event()
|
||||||
}
|
}
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(request) => {
|
||||||
|
request.state.is_awaiting_event()
|
||||||
|
}
|
||||||
ComponentRequests::NotNeeded { .. } => false,
|
ComponentRequests::NotNeeded { .. } => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -283,6 +304,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
ComponentRequests::ActiveCustodyRequest(_) => {
|
ComponentRequests::ActiveCustodyRequest(_) => {
|
||||||
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
|
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
|
||||||
}
|
}
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(_) => {
|
||||||
|
self.continue_request::<EnvelopeRequestState<T::EthSpec>>(cx, 0)?
|
||||||
|
}
|
||||||
ComponentRequests::NotNeeded { .. } => {} // do nothing
|
ComponentRequests::NotNeeded { .. } => {} // do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -304,7 +328,8 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
expected_blobs: usize,
|
expected_blobs: usize,
|
||||||
) -> Result<(), LookupRequestError> {
|
) -> Result<(), LookupRequestError> {
|
||||||
let id = self.id;
|
let id = self.id;
|
||||||
let awaiting_event = self.awaiting_parent.is_some() || self.awaiting_envelope.is_some();
|
let awaiting_event =
|
||||||
|
self.awaiting_parent.is_some() || self.awaiting_parent_envelope.is_some();
|
||||||
let request =
|
let request =
|
||||||
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
||||||
|
|
||||||
@@ -444,6 +469,26 @@ impl<E: EthSpec> BlockRequestState<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The state of the envelope request component of a `SingleBlockLookup`.
|
||||||
|
/// Used for envelope-only lookups where the parent block is already imported
|
||||||
|
/// but its execution payload envelope is missing.
|
||||||
|
#[derive(Educe)]
|
||||||
|
#[educe(Debug)]
|
||||||
|
pub struct EnvelopeRequestState<E: EthSpec> {
|
||||||
|
#[educe(Debug(ignore))]
|
||||||
|
pub block_root: Hash256,
|
||||||
|
pub state: SingleLookupRequestState<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: EthSpec> EnvelopeRequestState<E> {
|
||||||
|
pub fn new(block_root: Hash256) -> Self {
|
||||||
|
Self {
|
||||||
|
block_root,
|
||||||
|
state: SingleLookupRequestState::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct DownloadResult<T: Clone> {
|
pub struct DownloadResult<T: Clone> {
|
||||||
pub value: T,
|
pub value: T,
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ use crate::service::NetworkMessage;
|
|||||||
use crate::status::ToStatusMessage;
|
use crate::status::ToStatusMessage;
|
||||||
use crate::sync::block_lookups::{
|
use crate::sync::block_lookups::{
|
||||||
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
|
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
|
||||||
|
EnvelopeRequestState,
|
||||||
};
|
};
|
||||||
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
|
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
|
||||||
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
|
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
|
||||||
@@ -1278,27 +1279,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
.network
|
.network
|
||||||
.on_single_envelope_response(id, peer_id, rpc_event)
|
.on_single_envelope_response(id, peer_id, rpc_event)
|
||||||
{
|
{
|
||||||
match resp {
|
self.block_lookups
|
||||||
Ok((envelope, seen_timestamp)) => {
|
.on_download_response::<EnvelopeRequestState<T::EthSpec>>(
|
||||||
let block_root = envelope.beacon_block_root();
|
id,
|
||||||
debug!(
|
resp.map(|(value, seen_timestamp)| {
|
||||||
?block_root,
|
(value, PeerGroup::from_single(peer_id), seen_timestamp)
|
||||||
%id,
|
}),
|
||||||
"Downloaded payload envelope, sending for processing"
|
&mut self.network,
|
||||||
);
|
)
|
||||||
if let Err(e) = self.network.send_envelope_for_processing(
|
|
||||||
id.req_id,
|
|
||||||
envelope,
|
|
||||||
seen_timestamp,
|
|
||||||
block_root,
|
|
||||||
) {
|
|
||||||
error!(error = ?e, "Failed to send envelope for processing");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
debug!(error = ?e, %id, "Payload envelope download failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -944,9 +944,26 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
pub fn envelope_lookup_request(
|
pub fn envelope_lookup_request(
|
||||||
&mut self,
|
&mut self,
|
||||||
lookup_id: SingleLookupId,
|
lookup_id: SingleLookupId,
|
||||||
peer_id: PeerId,
|
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
) -> Result<Id, RpcRequestSendError> {
|
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||||
|
let active_request_count_by_peer = self.active_request_count_by_peer();
|
||||||
|
let Some(peer_id) = lookup_peers
|
||||||
|
.read()
|
||||||
|
.iter()
|
||||||
|
.map(|peer| {
|
||||||
|
(
|
||||||
|
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
|
||||||
|
rand::random::<u32>(),
|
||||||
|
peer,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.min()
|
||||||
|
.map(|(_, _, peer)| *peer)
|
||||||
|
else {
|
||||||
|
return Ok(LookupRequestResult::Pending("no peers"));
|
||||||
|
};
|
||||||
|
|
||||||
let id = SingleLookupReqId {
|
let id = SingleLookupReqId {
|
||||||
lookup_id,
|
lookup_id,
|
||||||
req_id: self.next_id(),
|
req_id: self.next_id(),
|
||||||
@@ -988,7 +1005,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
request_span,
|
request_span,
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(id.req_id)
|
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
|
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
|
||||||
|
|||||||
Reference in New Issue
Block a user