mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-25 16:58:28 +00:00
Remove lookup
This commit is contained in:
@@ -1290,19 +1290,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root));
|
||||
return None;
|
||||
}
|
||||
Err(BlockError::ParentEnvelopeUnknown { parent_root }) => {
|
||||
// The parent block is known but its execution payload envelope has not
|
||||
// been received yet. Queue this block for reprocessing and trigger an
|
||||
// envelope lookup.
|
||||
debug!(
|
||||
?block_root,
|
||||
?parent_root,
|
||||
"Parent envelope not yet available for gossip block"
|
||||
);
|
||||
// TODO(gloas): trigger an envelope lookup for `parent_root` and queue
|
||||
// this block for reprocessing once the envelope arrives.
|
||||
return None;
|
||||
}
|
||||
Err(e @ BlockError::BeaconChainError(_)) => {
|
||||
debug!(
|
||||
error = ?e,
|
||||
@@ -1591,17 +1578,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"Block with unknown parent attempted to be processed"
|
||||
);
|
||||
}
|
||||
Err(BlockError::ParentEnvelopeUnknown { parent_root }) => {
|
||||
// The parent block is known but its execution payload envelope has not
|
||||
// been received yet.
|
||||
debug!(
|
||||
%block_root,
|
||||
?parent_root,
|
||||
"Parent envelope not yet available, need envelope lookup"
|
||||
);
|
||||
// TODO(gloas): trigger an envelope lookup for `parent_root` and queue
|
||||
// this block for reprocessing once the envelope arrives.
|
||||
}
|
||||
Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => {
|
||||
debug!(
|
||||
error = %e,
|
||||
|
||||
@@ -541,22 +541,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for an RPC payload envelope.
|
||||
pub fn send_rpc_payload_envelope(
|
||||
self: &Arc<Self>,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let process_fn =
|
||||
self.clone()
|
||||
.generate_rpc_envelope_process_fn(envelope, seen_timestamp, process_type);
|
||||
self.try_send(BeaconWorkEvent {
|
||||
drop_during_sync: false,
|
||||
work: Work::RpcPayloadEnvelope { process_fn },
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for some blobs, where the result from computation (if any) is
|
||||
/// sent to the other side of `result_tx`.
|
||||
pub fn send_rpc_blobs(
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::sync::BatchProcessResult;
|
||||
use crate::sync::manager::CustodyBatchProcessResult;
|
||||
use crate::sync::{
|
||||
ChainId,
|
||||
manager::{BlockProcessType, BlockProcessingResult, SyncMessage},
|
||||
manager::{BlockProcessType, SyncMessage},
|
||||
};
|
||||
use beacon_chain::block_verification_types::LookupBlock;
|
||||
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
|
||||
@@ -28,9 +28,7 @@ use store::KzgCommitment;
|
||||
use tracing::{debug, debug_span, error, info, instrument, warn};
|
||||
use types::data::FixedBlobSidecarList;
|
||||
use types::kzg_ext::format_kzg_commitments;
|
||||
use types::{
|
||||
BlockImportSource, DataColumnSidecarList, Epoch, Hash256, SignedExecutionPayloadEnvelope,
|
||||
};
|
||||
use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256};
|
||||
|
||||
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
@@ -76,79 +74,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
}
|
||||
|
||||
/// Returns an async closure which processes a payload envelope received via RPC.
|
||||
pub fn generate_rpc_envelope_process_fn(
|
||||
self: Arc<Self>,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> AsyncFn {
|
||||
let process_fn = async move {
|
||||
self.process_rpc_envelope(envelope, seen_timestamp, process_type)
|
||||
.await;
|
||||
};
|
||||
Box::pin(process_fn)
|
||||
}
|
||||
|
||||
/// Process an execution payload envelope received via RPC.
|
||||
async fn process_rpc_envelope(
|
||||
self: Arc<Self>,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
_seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) {
|
||||
let beacon_block_root = envelope.beacon_block_root();
|
||||
|
||||
// Verify the envelope using the gossip verification path (same checks apply to RPC)
|
||||
let verified_envelope = match self.chain.verify_envelope_for_gossip(envelope).await {
|
||||
Ok(verified) => verified,
|
||||
Err(e) => {
|
||||
debug!(
|
||||
error = ?e,
|
||||
?beacon_block_root,
|
||||
"RPC payload envelope failed verification"
|
||||
);
|
||||
self.send_sync_message(SyncMessage::BlockComponentProcessed {
|
||||
process_type,
|
||||
result: BlockProcessingResult::Err(BlockError::InternalError(format!(
|
||||
"Envelope verification failed: {e:?}"
|
||||
))),
|
||||
});
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Process the verified envelope
|
||||
let result = self
|
||||
.chain
|
||||
.process_execution_payload_envelope(
|
||||
beacon_block_root,
|
||||
verified_envelope,
|
||||
NotifyExecutionLayer::Yes,
|
||||
BlockImportSource::Lookup,
|
||||
|| Ok(()),
|
||||
)
|
||||
.await;
|
||||
|
||||
let processing_result = match result {
|
||||
Ok(status) => BlockProcessingResult::Ok(status),
|
||||
Err(e) => {
|
||||
debug!(
|
||||
error = ?e,
|
||||
?beacon_block_root,
|
||||
"RPC payload envelope processing failed"
|
||||
);
|
||||
BlockProcessingResult::Err(BlockError::InternalError(format!(
|
||||
"Envelope processing failed: {e:?}"
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
self.send_sync_message(SyncMessage::BlockComponentProcessed {
|
||||
process_type,
|
||||
result: processing_result,
|
||||
});
|
||||
}
|
||||
|
||||
/// Returns the `process_fn` and `ignore_fn` required when requeuing an RPC block.
|
||||
pub fn generate_lookup_beacon_block_fns(
|
||||
self: Arc<Self>,
|
||||
|
||||
@@ -713,34 +713,14 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
pub fn on_payload_envelopes_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
app_request_id: AppRequestId,
|
||||
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||
_app_request_id: AppRequestId,
|
||||
_envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||
) {
|
||||
let sync_request_id = match app_request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
id @ SyncRequestId::SinglePayloadEnvelope { .. } => id,
|
||||
other => {
|
||||
crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request");
|
||||
return;
|
||||
}
|
||||
},
|
||||
AppRequestId::Router => {
|
||||
crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync");
|
||||
return;
|
||||
}
|
||||
AppRequestId::Internal => unreachable!("Handled internally"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
// TODO(EIP-7732): Envelope lookup sync not yet implemented on this branch.
|
||||
crit!(
|
||||
%peer_id,
|
||||
"Received PayloadEnvelopesByRoot Response"
|
||||
"Received unexpected PayloadEnvelopesByRoot response"
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcPayloadEnvelope {
|
||||
peer_id,
|
||||
sync_request_id,
|
||||
envelope,
|
||||
seen_timestamp: timestamp_now(),
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a `BlobsByRoot` response from the peer.
|
||||
|
||||
@@ -109,7 +109,6 @@ pub type SingleLookupId = u32;
|
||||
enum Action {
|
||||
Retry,
|
||||
ParentUnknown { parent_root: Hash256 },
|
||||
ParentEnvelopeUnknown { parent_root: Hash256 },
|
||||
Drop(/* reason: */ String),
|
||||
Continue,
|
||||
}
|
||||
@@ -560,19 +559,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
BlockProcessType::SingleCustodyColumn(id) => {
|
||||
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
|
||||
}
|
||||
BlockProcessType::SinglePayloadEnvelope { id, block_root } => {
|
||||
match result {
|
||||
BlockProcessingResult::Ok(_) => {
|
||||
self.continue_envelope_child_lookups(block_root, cx);
|
||||
}
|
||||
BlockProcessingResult::Err(e) => {
|
||||
debug!(%id, error = ?e, "Payload envelope processing failed");
|
||||
// TODO(EIP-7732): resolve awaiting_envelope on affected lookups so they can retry
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
|
||||
}
|
||||
@@ -659,12 +645,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
request_state.revert_to_awaiting_processing()?;
|
||||
Action::ParentUnknown { parent_root }
|
||||
}
|
||||
BlockError::ParentEnvelopeUnknown { parent_root } => {
|
||||
// The parent block is known but its execution payload envelope is missing.
|
||||
// Revert to awaiting processing and fetch the envelope via RPC.
|
||||
request_state.revert_to_awaiting_processing()?;
|
||||
Action::ParentEnvelopeUnknown { parent_root }
|
||||
}
|
||||
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
|
||||
// These errors indicate that the execution layer is offline
|
||||
// and failed to validate the execution payload. Do not downscore peer.
|
||||
@@ -762,26 +742,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
)))
|
||||
}
|
||||
}
|
||||
Action::ParentEnvelopeUnknown { parent_root } => {
|
||||
let peers = lookup.all_peers();
|
||||
lookup.set_awaiting_envelope(parent_root);
|
||||
// Pick a peer to request the envelope from
|
||||
let peer_id = peers.first().copied().ok_or_else(|| {
|
||||
LookupRequestError::Failed("No peers available for envelope request".to_owned())
|
||||
})?;
|
||||
match cx.envelope_lookup_request(lookup_id, peer_id, parent_root) {
|
||||
Ok(_) => {
|
||||
debug!(
|
||||
id = lookup_id,
|
||||
?block_root,
|
||||
?parent_root,
|
||||
"Requesting missing parent envelope"
|
||||
);
|
||||
Ok(LookupResult::Pending)
|
||||
}
|
||||
Err(e) => Err(LookupRequestError::SendFailedNetwork(e)),
|
||||
}
|
||||
}
|
||||
Action::Drop(reason) => {
|
||||
// Drop with noop
|
||||
Err(LookupRequestError::Failed(reason))
|
||||
@@ -849,33 +809,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Makes progress on lookups that were waiting for a parent envelope to be imported.
|
||||
pub fn continue_envelope_child_lookups(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) {
|
||||
let mut lookup_results = vec![];
|
||||
|
||||
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
||||
if lookup.awaiting_envelope() == Some(block_root) {
|
||||
lookup.resolve_awaiting_envelope();
|
||||
debug!(
|
||||
envelope_root = ?block_root,
|
||||
id,
|
||||
block_root = ?lookup.block_root(),
|
||||
"Continuing lookup after envelope imported"
|
||||
);
|
||||
let result = lookup.continue_requests(cx);
|
||||
lookup_results.push((*id, result));
|
||||
}
|
||||
}
|
||||
|
||||
for (id, result) in lookup_results {
|
||||
self.on_lookup_result(id, result, "continue_envelope_child_lookups", cx);
|
||||
}
|
||||
}
|
||||
|
||||
/// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need
|
||||
/// the parent to make progress to resolve, therefore we must drop them if the parent is
|
||||
/// dropped.
|
||||
|
||||
@@ -70,7 +70,6 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||
peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
block_root: Hash256,
|
||||
awaiting_parent: Option<Hash256>,
|
||||
awaiting_envelope: Option<Hash256>,
|
||||
created: Instant,
|
||||
pub(crate) span: Span,
|
||||
}
|
||||
@@ -105,7 +104,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
|
||||
block_root: requested_block_root,
|
||||
awaiting_parent,
|
||||
awaiting_envelope: None,
|
||||
created: Instant::now(),
|
||||
span: lookup_span,
|
||||
}
|
||||
@@ -146,20 +144,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
self.awaiting_parent = None;
|
||||
}
|
||||
|
||||
pub fn awaiting_envelope(&self) -> Option<Hash256> {
|
||||
self.awaiting_envelope
|
||||
}
|
||||
|
||||
/// Mark this lookup as awaiting a parent envelope to be imported before processing.
|
||||
pub fn set_awaiting_envelope(&mut self, parent_root: Hash256) {
|
||||
self.awaiting_envelope = Some(parent_root);
|
||||
}
|
||||
|
||||
/// Mark this lookup as no longer awaiting a parent envelope.
|
||||
pub fn resolve_awaiting_envelope(&mut self) {
|
||||
self.awaiting_envelope = None;
|
||||
}
|
||||
|
||||
/// Returns the time elapsed since this lookup was created
|
||||
pub fn elapsed_since_created(&self) -> Duration {
|
||||
self.created.elapsed()
|
||||
@@ -201,7 +185,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
/// Returns true if this request is expecting some event to make progress
|
||||
pub fn is_awaiting_event(&self) -> bool {
|
||||
self.awaiting_parent.is_some()
|
||||
|| self.awaiting_envelope.is_some()
|
||||
|| self.block_request_state.state.is_awaiting_event()
|
||||
|| match &self.component_requests {
|
||||
// If components are waiting for the block request to complete, here we should
|
||||
@@ -304,7 +287,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
expected_blobs: usize,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let id = self.id;
|
||||
let awaiting_event = self.awaiting_parent.is_some() || self.awaiting_envelope.is_some();
|
||||
let awaiting_event = self.awaiting_parent.is_some();
|
||||
let request =
|
||||
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
||||
|
||||
|
||||
@@ -74,8 +74,7 @@ use strum::IntoStaticStr;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{debug, error, info, trace};
|
||||
use types::{
|
||||
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
|
||||
SignedExecutionPayloadEnvelope, Slot,
|
||||
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
|
||||
};
|
||||
|
||||
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
|
||||
@@ -134,14 +133,6 @@ pub enum SyncMessage<E: EthSpec> {
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
|
||||
/// An execution payload envelope has been received from the RPC.
|
||||
RpcPayloadEnvelope {
|
||||
sync_request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
envelope: Option<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
|
||||
/// A block with an unknown parent has been received.
|
||||
UnknownParentBlock(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
|
||||
|
||||
@@ -193,7 +184,6 @@ pub enum BlockProcessType {
|
||||
SingleBlock { id: Id },
|
||||
SingleBlob { id: Id },
|
||||
SingleCustodyColumn(Id),
|
||||
SinglePayloadEnvelope { id: Id, block_root: Hash256 },
|
||||
}
|
||||
|
||||
impl BlockProcessType {
|
||||
@@ -201,8 +191,7 @@ impl BlockProcessType {
|
||||
match self {
|
||||
BlockProcessType::SingleBlock { id }
|
||||
| BlockProcessType::SingleBlob { id }
|
||||
| BlockProcessType::SingleCustodyColumn(id)
|
||||
| BlockProcessType::SinglePayloadEnvelope { id, .. } => *id,
|
||||
| BlockProcessType::SingleCustodyColumn(id) => *id,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -516,9 +505,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
SyncRequestId::DataColumnsByRange(req_id) => {
|
||||
self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error))
|
||||
}
|
||||
SyncRequestId::SinglePayloadEnvelope { id } => {
|
||||
self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -853,17 +839,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
} => {
|
||||
self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp)
|
||||
}
|
||||
SyncMessage::RpcPayloadEnvelope {
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
envelope,
|
||||
seen_timestamp,
|
||||
} => self.rpc_payload_envelope_received(
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
envelope,
|
||||
seen_timestamp,
|
||||
),
|
||||
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
|
||||
let block_slot = block.slot();
|
||||
let parent_root = block.parent_root();
|
||||
@@ -1225,59 +1200,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn rpc_payload_envelope_received(
|
||||
&mut self,
|
||||
sync_request_id: SyncRequestId,
|
||||
peer_id: PeerId,
|
||||
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||
seen_timestamp: Duration,
|
||||
) {
|
||||
match sync_request_id {
|
||||
SyncRequestId::SinglePayloadEnvelope { id } => self.on_single_envelope_response(
|
||||
id,
|
||||
peer_id,
|
||||
RpcEvent::from_chunk(envelope, seen_timestamp),
|
||||
),
|
||||
_ => {
|
||||
crit!(%peer_id, "bad request id for payload envelope");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_single_envelope_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
peer_id: PeerId,
|
||||
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||
) {
|
||||
if let Some(resp) = self
|
||||
.network
|
||||
.on_single_envelope_response(id, peer_id, rpc_event)
|
||||
{
|
||||
match resp {
|
||||
Ok((envelope, seen_timestamp)) => {
|
||||
let block_root = envelope.beacon_block_root();
|
||||
debug!(
|
||||
?block_root,
|
||||
%id,
|
||||
"Downloaded payload envelope, sending for processing"
|
||||
);
|
||||
if let Err(e) = self.network.send_envelope_for_processing(
|
||||
id.req_id,
|
||||
envelope,
|
||||
seen_timestamp,
|
||||
block_root,
|
||||
) {
|
||||
error!(error = ?e, "Failed to send envelope for processing");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(error = ?e, %id, "Payload envelope download failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_single_blob_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
|
||||
@@ -37,7 +37,6 @@ pub use requests::LookupVerifyError;
|
||||
use requests::{
|
||||
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
|
||||
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
|
||||
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
|
||||
};
|
||||
#[cfg(test)]
|
||||
use slot_clock::SlotClock;
|
||||
@@ -53,7 +52,7 @@ use tracing::{Span, debug, debug_span, error, warn};
|
||||
use types::data::FixedBlobSidecarList;
|
||||
use types::{
|
||||
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
|
||||
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||
ForkContext, Hash256, SignedBeaconBlock, Slot,
|
||||
};
|
||||
|
||||
pub mod custody;
|
||||
@@ -214,9 +213,6 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
||||
/// A mapping of active DataColumnsByRange requests
|
||||
data_columns_by_range_requests:
|
||||
ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>,
|
||||
/// A mapping of active PayloadEnvelopesByRoot requests
|
||||
payload_envelopes_by_root_requests:
|
||||
ActiveRequests<SingleLookupReqId, PayloadEnvelopesByRootRequestItems<T::EthSpec>>,
|
||||
/// Mapping of active custody column requests for a block root
|
||||
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
|
||||
|
||||
@@ -302,7 +298,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
|
||||
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
|
||||
data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"),
|
||||
payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"),
|
||||
custody_by_root_requests: <_>::default(),
|
||||
components_by_range_requests: FnvHashMap::default(),
|
||||
custody_backfill_data_column_batch_requests: FnvHashMap::default(),
|
||||
@@ -331,7 +326,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
blocks_by_range_requests,
|
||||
blobs_by_range_requests,
|
||||
data_columns_by_range_requests,
|
||||
payload_envelopes_by_root_requests,
|
||||
// custody_by_root_requests is a meta request of data_columns_by_root_requests
|
||||
custody_by_root_requests: _,
|
||||
// components_by_range_requests is a meta request of various _by_range requests
|
||||
@@ -367,17 +361,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.active_requests_of_peer(peer_id)
|
||||
.into_iter()
|
||||
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
|
||||
let envelope_by_root_ids = payload_envelopes_by_root_requests
|
||||
.active_requests_of_peer(peer_id)
|
||||
.into_iter()
|
||||
.map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id });
|
||||
blocks_by_root_ids
|
||||
.chain(blobs_by_root_ids)
|
||||
.chain(data_column_by_root_ids)
|
||||
.chain(blocks_by_range_ids)
|
||||
.chain(blobs_by_range_ids)
|
||||
.chain(data_column_by_range_ids)
|
||||
.chain(envelope_by_root_ids)
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -434,7 +423,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
blocks_by_range_requests,
|
||||
blobs_by_range_requests,
|
||||
data_columns_by_range_requests,
|
||||
payload_envelopes_by_root_requests,
|
||||
// custody_by_root_requests is a meta request of data_columns_by_root_requests
|
||||
custody_by_root_requests: _,
|
||||
// components_by_range_requests is a meta request of various _by_range requests
|
||||
@@ -457,7 +445,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.chain(blocks_by_range_requests.iter_request_peers())
|
||||
.chain(blobs_by_range_requests.iter_request_peers())
|
||||
.chain(data_columns_by_range_requests.iter_request_peers())
|
||||
.chain(payload_envelopes_by_root_requests.iter_request_peers())
|
||||
{
|
||||
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
|
||||
}
|
||||
@@ -940,57 +927,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||
}
|
||||
|
||||
/// Request a payload envelope for `block_root` from a peer.
|
||||
pub fn envelope_lookup_request(
|
||||
&mut self,
|
||||
lookup_id: SingleLookupId,
|
||||
peer_id: PeerId,
|
||||
block_root: Hash256,
|
||||
) -> Result<Id, RpcRequestSendError> {
|
||||
let id = SingleLookupReqId {
|
||||
lookup_id,
|
||||
req_id: self.next_id(),
|
||||
};
|
||||
|
||||
let request = PayloadEnvelopesByRootSingleRequest(block_root);
|
||||
|
||||
let network_request = RequestType::PayloadEnvelopesByRoot(
|
||||
request
|
||||
.into_request(&self.fork_context)
|
||||
.map_err(RpcRequestSendError::InternalError)?,
|
||||
);
|
||||
self.network_send
|
||||
.send(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: network_request,
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }),
|
||||
})
|
||||
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
|
||||
|
||||
debug!(
|
||||
method = "PayloadEnvelopesByRoot",
|
||||
?block_root,
|
||||
peer = %peer_id,
|
||||
%id,
|
||||
"Sync RPC request sent"
|
||||
);
|
||||
|
||||
let request_span = debug_span!(
|
||||
parent: Span::current(),
|
||||
"lh_outgoing_envelope_by_root_request",
|
||||
%block_root,
|
||||
);
|
||||
self.payload_envelopes_by_root_requests.insert(
|
||||
id,
|
||||
peer_id,
|
||||
true,
|
||||
PayloadEnvelopesByRootRequestItems::new(request),
|
||||
request_span,
|
||||
);
|
||||
|
||||
Ok(id.req_id)
|
||||
}
|
||||
|
||||
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
|
||||
/// - If we have a downloaded but not yet processed block
|
||||
/// - If the da_checker has a pending block
|
||||
@@ -1499,27 +1435,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
self.on_rpc_response_result(resp, peer_id)
|
||||
}
|
||||
|
||||
pub(crate) fn on_single_envelope_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
peer_id: PeerId,
|
||||
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||
) -> Option<RpcResponseResult<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>> {
|
||||
let resp = self
|
||||
.payload_envelopes_by_root_requests
|
||||
.on_response(id, rpc_event);
|
||||
let resp = resp.map(|res| {
|
||||
res.and_then(|(mut envelopes, seen_timestamp)| {
|
||||
match envelopes.pop() {
|
||||
Some(envelope) => Ok((envelope, seen_timestamp)),
|
||||
// Should never happen, request items enforces at least 1 chunk.
|
||||
None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()),
|
||||
}
|
||||
})
|
||||
});
|
||||
self.on_rpc_response_result(resp, peer_id)
|
||||
}
|
||||
|
||||
pub(crate) fn on_single_blob_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
@@ -1695,33 +1610,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn send_envelope_for_processing(
|
||||
&self,
|
||||
id: Id,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
block_root: Hash256,
|
||||
) -> Result<(), SendErrorProcessor> {
|
||||
let beacon_processor = self
|
||||
.beacon_processor_if_enabled()
|
||||
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
|
||||
|
||||
debug!(?block_root, ?id, "Sending payload envelope for processing");
|
||||
beacon_processor
|
||||
.send_rpc_payload_envelope(
|
||||
envelope,
|
||||
seen_timestamp,
|
||||
BlockProcessType::SinglePayloadEnvelope { id, block_root },
|
||||
)
|
||||
.map_err(|e| {
|
||||
error!(
|
||||
error = ?e,
|
||||
"Failed to send sync envelope to processor"
|
||||
);
|
||||
SendErrorProcessor::SendError
|
||||
})
|
||||
}
|
||||
|
||||
pub fn send_blobs_for_processing(
|
||||
&self,
|
||||
id: Id,
|
||||
|
||||
@@ -16,9 +16,6 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems;
|
||||
pub use data_columns_by_root::{
|
||||
DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest,
|
||||
};
|
||||
pub use payload_envelopes_by_root::{
|
||||
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
|
||||
};
|
||||
|
||||
use crate::metrics;
|
||||
|
||||
@@ -30,7 +27,6 @@ mod blocks_by_range;
|
||||
mod blocks_by_root;
|
||||
mod data_columns_by_range;
|
||||
mod data_columns_by_root;
|
||||
mod payload_envelopes_by_root;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
|
||||
pub enum LookupVerifyError {
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest;
|
||||
use std::sync::Arc;
|
||||
use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope};
|
||||
|
||||
use super::{ActiveRequestItems, LookupVerifyError};
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct PayloadEnvelopesByRootSingleRequest(pub Hash256);
|
||||
|
||||
impl PayloadEnvelopesByRootSingleRequest {
|
||||
pub fn into_request(
|
||||
self,
|
||||
fork_context: &ForkContext,
|
||||
) -> Result<PayloadEnvelopesByRootRequest, String> {
|
||||
PayloadEnvelopesByRootRequest::new(vec![self.0], fork_context)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PayloadEnvelopesByRootRequestItems<E: EthSpec> {
|
||||
request: PayloadEnvelopesByRootSingleRequest,
|
||||
items: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> PayloadEnvelopesByRootRequestItems<E> {
|
||||
pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self {
|
||||
Self {
|
||||
request,
|
||||
items: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ActiveRequestItems for PayloadEnvelopesByRootRequestItems<E> {
|
||||
type Item = Arc<SignedExecutionPayloadEnvelope<E>>;
|
||||
|
||||
/// Append a response to the single chunk request. If the chunk is valid, the request is
|
||||
/// resolved immediately.
|
||||
/// The active request SHOULD be dropped after `add_response` returns an error
|
||||
fn add(&mut self, envelope: Self::Item) -> Result<bool, LookupVerifyError> {
|
||||
let beacon_block_root = envelope.beacon_block_root();
|
||||
if self.request.0 != beacon_block_root {
|
||||
return Err(LookupVerifyError::UnrequestedBlockRoot(beacon_block_root));
|
||||
}
|
||||
|
||||
self.items.push(envelope);
|
||||
// Always returns true, payload envelopes by root expects a single response
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn consume(&mut self) -> Vec<Self::Item> {
|
||||
std::mem::take(&mut self.items)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user