Range sync

This commit is contained in:
Eitan Seri-Levi
2026-04-28 17:25:53 +02:00
parent 82dde267b5
commit 802f77f700
9 changed files with 414 additions and 35 deletions

View File

@@ -46,6 +46,7 @@ impl<E: EthSpec> LookupBlock<E> {
/// This includes any and all blobs/columns required, including zero if /// This includes any and all blobs/columns required, including zero if
/// none are required. This can happen if the block is pre-deneb or if /// none are required. This can happen if the block is pre-deneb or if
/// it's simply past the DA boundary. /// it's simply past the DA boundary.
#[derive(Clone)]
pub enum RangeSyncBlock<E: EthSpec> { pub enum RangeSyncBlock<E: EthSpec> {
Base(AvailableBlock<E>), Base(AvailableBlock<E>),
Gloas { Gloas {

View File

@@ -48,7 +48,7 @@ pub struct EnvelopeImportData<E: EthSpec> {
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
#[allow(dead_code)] #[allow(dead_code)]
pub struct AvailableEnvelope<E: EthSpec> { pub struct AvailableEnvelope<E: EthSpec> {
execution_block_hash: ExecutionBlockHash, execution_block_hash: ExecutionBlockHash,

View File

@@ -1831,6 +1831,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
error!(error = %e, "Internal block gossip validation error"); error!(error = %e, "Internal block gossip validation error");
return None; return None;
} }
Err(BlockError::ParentEnvelopeUnknown { .. }) => {
// Gossip validation does not check envelope availability; this should not occur.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return None;
}
Err(e @ BlockError::EnvelopeError(_)) => {
debug!(error = %e, "Gossip block envelope error");
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return None;
}
Err(e @ BlockError::PayloadEnvelopeError { .. }) => {
debug!(error = %e, "Gossip block payload envelope error");
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return None;
}
}; };
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);

View File

@@ -33,6 +33,7 @@ pub type BatchId = Epoch;
#[strum(serialize_all = "snake_case")] #[strum(serialize_all = "snake_case")]
pub enum ByRangeRequestType { pub enum ByRangeRequestType {
BlocksAndColumns, BlocksAndColumns,
BlocksAndEnvelopesAndColumns,
BlocksAndBlobs, BlocksAndBlobs,
Blocks, Blocks,
Columns(HashSet<u64>), Columns(HashSet<u64>),

View File

@@ -4,11 +4,13 @@ use beacon_chain::{
data_availability_checker::DataAvailabilityChecker, data_availability_checker::DataAvailabilityChecker,
data_column_verification::CustodyDataColumn, data_column_verification::CustodyDataColumn,
get_block_root, get_block_root,
payload_envelope_verification::AvailableEnvelope,
}; };
use lighthouse_network::{ use lighthouse_network::{
PeerId, PeerId,
service::api_types::{ service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
PayloadEnvelopesByRangeRequestId,
}, },
}; };
use ssz_types::RuntimeVariableList; use ssz_types::RuntimeVariableList;
@@ -16,7 +18,7 @@ use std::{collections::HashMap, sync::Arc};
use tracing::{Span, debug}; use tracing::{Span, debug};
use types::{ use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
Hash256, SignedBeaconBlock, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope,
}; };
use crate::sync::network_context::MAX_COLUMN_RETRIES; use crate::sync::network_context::MAX_COLUMN_RETRIES;
@@ -35,6 +37,13 @@ use crate::sync::network_context::MAX_COLUMN_RETRIES;
pub struct RangeBlockComponentsRequest<E: EthSpec> { pub struct RangeBlockComponentsRequest<E: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar. /// Blocks we have received awaiting for their corresponding sidecar.
blocks_request: ByRangeRequest<BlocksByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>, blocks_request: ByRangeRequest<BlocksByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>,
/// Payload envelopes (Gloas+). None for pre-Gloas forks.
payloads_request: Option<
ByRangeRequest<
PayloadEnvelopesByRangeRequestId,
Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
>,
>,
/// Sidecars we have received awaiting for their corresponding block. /// Sidecars we have received awaiting for their corresponding block.
block_data_request: RangeBlockDataRequest<E>, block_data_request: RangeBlockDataRequest<E>,
/// Span to track the range request and all children range requests. /// Span to track the range request and all children range requests.
@@ -88,6 +97,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Vec<(DataColumnsByRangeRequestId, Vec<ColumnIndex>)>, Vec<(DataColumnsByRangeRequestId, Vec<ColumnIndex>)>,
Vec<ColumnIndex>, Vec<ColumnIndex>,
)>, )>,
payloads_req_id: Option<PayloadEnvelopesByRangeRequestId>,
request_span: Span, request_span: Span,
) -> Self { ) -> Self {
let block_data_request = if let Some(blobs_req_id) = blobs_req_id { let block_data_request = if let Some(blobs_req_id) = blobs_req_id {
@@ -109,6 +119,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Self { Self {
blocks_request: ByRangeRequest::Active(blocks_req_id), blocks_request: ByRangeRequest::Active(blocks_req_id),
payloads_request: payloads_req_id.map(ByRangeRequest::Active),
block_data_request, block_data_request,
request_span, request_span,
} }
@@ -191,6 +202,18 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
} }
} }
/// Adds received payload envelopes to the request.
pub fn add_payload_envelopes(
&mut self,
req_id: PayloadEnvelopesByRangeRequestId,
envelopes: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
) -> Result<(), String> {
match &mut self.payloads_request {
Some(req) => req.finish(req_id, envelopes),
None => Err("received payload envelopes but none expected".to_owned()),
}
}
/// Attempts to construct RPC blocks from all received components. /// Attempts to construct RPC blocks from all received components.
/// ///
/// Returns `None` if not all expected requests have completed. /// Returns `None` if not all expected requests have completed.
@@ -208,6 +231,13 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
return None; return None;
}; };
// If payloads are expected, they must also be complete before we can produce responses.
if let Some(payloads_req) = &self.payloads_request
&& payloads_req.to_finished().is_none()
{
return None;
}
// Increment the attempt once this function returns the response or errors // Increment the attempt once this function returns the response or errors
match &mut self.block_data_request { match &mut self.block_data_request {
RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs(
@@ -254,15 +284,29 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
} }
} }
let resp = Self::responses_with_custody_columns( // Gloas path: if payloads are present, produce Gloas blocks
blocks.to_vec(), let resp = if let Some(payloads_req) = &self.payloads_request {
data_columns, let payloads = payloads_req.to_finished().expect("checked above").to_vec();
column_to_peer_id, Self::responses_gloas(
expected_custody_columns, blocks.to_vec(),
*attempt, payloads,
da_checker, data_columns,
spec, column_to_peer_id,
); expected_custody_columns,
*attempt,
spec,
)
} else {
Self::responses_with_custody_columns(
blocks.to_vec(),
data_columns,
column_to_peer_id,
expected_custody_columns,
*attempt,
da_checker,
spec,
)
};
if let Err(CouplingError::DataColumnPeerFailure { if let Err(CouplingError::DataColumnPeerFailure {
error: _, error: _,
@@ -460,6 +504,124 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Ok(range_sync_blocks) Ok(range_sync_blocks)
} }
/// Couples blocks with payload envelopes and custody columns for Gloas range sync.
fn responses_gloas(
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
payloads: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
data_columns: DataColumnSidecarList<E>,
column_to_peer: HashMap<u64, PeerId>,
expects_custody_columns: &[ColumnIndex],
attempt: usize,
spec: Arc<ChainSpec>,
) -> Result<Vec<RangeSyncBlock<E>>, CouplingError> {
let mut data_columns_by_block =
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
for column in data_columns {
let block_root = column.block_root();
let index = *column.index();
if data_columns_by_block
.entry(block_root)
.or_default()
.insert(index, column)
.is_some()
{
debug!(?block_root, ?index, "Repeated column for block_root");
}
}
let mut range_sync_blocks = Vec::with_capacity(blocks.len());
let mut payload_iter = payloads.into_iter().peekable();
let exceeded_retries = attempt >= MAX_COLUMN_RETRIES;
for block in blocks {
let mut envelope_for_block = None;
if payload_iter
.peek()
.map(|e| e.message.slot() == block.slot())
.unwrap_or(false)
{
envelope_for_block = payload_iter.next();
}
let block_root = get_block_root(&block);
let available_envelope = if block.num_expected_blobs() > 0 {
let envelope = envelope_for_block.ok_or_else(|| {
CouplingError::InternalError(format!(
"Missing payload envelope for block {block_root:?} with blobs"
))
})?;
let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root)
else {
let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect();
return Err(CouplingError::DataColumnPeerFailure {
error: format!("No columns for block {block_root:?} with data"),
faulty_peers: responsible_peers,
exceeded_retries,
});
};
let mut custody_columns = vec![];
let mut naughty_peers = vec![];
for index in expects_custody_columns {
if let Some(data_column) = data_columns_by_index.remove(index) {
custody_columns.push(data_column);
} else {
let Some(responsible_peer) = column_to_peer.get(index) else {
return Err(CouplingError::InternalError(format!(
"Internal error, no request made for column {index}"
)));
};
naughty_peers.push((*index, *responsible_peer));
}
}
if !naughty_peers.is_empty() {
return Err(CouplingError::DataColumnPeerFailure {
error: format!(
"Peers did not return column for block_root {block_root:?} {naughty_peers:?}"
),
faulty_peers: naughty_peers,
exceeded_retries,
});
}
Some(Box::new(AvailableEnvelope::new(
envelope.block_hash(),
envelope,
custody_columns,
None,
spec.clone(),
)))
} else {
envelope_for_block.map(|envelope| {
Box::new(AvailableEnvelope::new(
envelope.block_hash(),
envelope,
vec![],
None,
spec.clone(),
))
})
};
range_sync_blocks.push(RangeSyncBlock::new_gloas(block, available_envelope));
}
if payload_iter.next().is_some() {
let remaining = payload_iter.count() + 1;
debug!(remaining, "Received payload envelopes that don't pair with blocks");
}
if !data_columns_by_block.is_empty() {
let remaining_roots = data_columns_by_block.keys().collect::<Vec<_>>();
debug!(?remaining_roots, "Not all columns consumed for Gloas blocks");
}
Ok(range_sync_blocks)
}
} }
impl<I: PartialEq + std::fmt::Display, T> ByRangeRequest<I, T> { impl<I: PartialEq + std::fmt::Display, T> ByRangeRequest<I, T> {
@@ -560,7 +722,7 @@ mod tests {
let blocks_req_id = blocks_id(components_id()); let blocks_req_id = blocks_id(components_id());
let mut info = let mut info =
RangeBlockComponentsRequest::<E>::new(blocks_req_id, None, None, Span::none()); RangeBlockComponentsRequest::<E>::new(blocks_req_id, None, None, None, Span::none());
// Send blocks and complete terminate response // Send blocks and complete terminate response
info.add_blocks(blocks_req_id, blocks).unwrap(); info.add_blocks(blocks_req_id, blocks).unwrap();

View File

@@ -59,7 +59,8 @@ use lighthouse_network::service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester,
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId,
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId,
SyncRequestId,
}; };
use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::{PeerAction, PeerId};
@@ -73,7 +74,8 @@ use strum::IntoStaticStr;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::{debug, error, info, trace}; use tracing::{debug, error, info, trace};
use types::{ use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot,
}; };
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
@@ -512,6 +514,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::DataColumnsByRange(req_id) => { SyncRequestId::DataColumnsByRange(req_id) => {
self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error))
} }
SyncRequestId::SinglePayloadEnvelope { id } => {
self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::PayloadEnvelopesByRange(req_id) => self
.on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)),
} }
} }
@@ -1331,6 +1338,36 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
} }
fn on_single_envelope_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
// Placeholder: by-root envelope lookup not yet implemented for range sync.
// This is called on error injection for disconnected peers. Log and ignore.
let _ = (id, peer_id, rpc_event);
debug!("on_single_envelope_response: not yet implemented");
}
fn on_payload_envelopes_by_range_response(
&mut self,
id: PayloadEnvelopesByRangeRequestId,
peer_id: PeerId,
envelope: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
if let Some(resp) = self
.network
.on_payload_envelopes_by_range_response(id, peer_id, envelope)
{
self.on_range_components_response(
id.parent_request_id,
peer_id,
RangeBlockComponent::PayloadEnvelope(id, resp),
);
}
}
fn on_custody_by_root_result( fn on_custody_by_root_result(
&mut self, &mut self,
requester: CustodyRequester, requester: CustodyRequester,

View File

@@ -22,14 +22,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
use custody::CustodyRequestResult; use custody::CustodyRequestResult;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest,
};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType};
pub use lighthouse_network::service::api_types::RangeRequestId; pub use lighthouse_network::service::api_types::RangeRequestId;
use lighthouse_network::service::api_types::{ use lighthouse_network::service::api_types::{
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester,
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId,
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId,
SyncRequestId,
}; };
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use parking_lot::RwLock; use parking_lot::RwLock;
@@ -37,6 +40,7 @@ pub use requests::LookupVerifyError;
use requests::{ use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRangeRequestItems,
}; };
#[cfg(test)] #[cfg(test)]
use slot_clock::SlotClock; use slot_clock::SlotClock;
@@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn};
use types::data::FixedBlobSidecarList; use types::data::FixedBlobSidecarList;
use types::{ use types::{
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, Slot, ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
}; };
pub mod custody; pub mod custody;
@@ -213,6 +217,11 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A mapping of active DataColumnsByRange requests /// A mapping of active DataColumnsByRange requests
data_columns_by_range_requests: data_columns_by_range_requests:
ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>, ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>,
/// A mapping of active PayloadEnvelopesByRange requests
payload_envelopes_by_range_requests: ActiveRequests<
PayloadEnvelopesByRangeRequestId,
PayloadEnvelopesByRangeRequestItems<T::EthSpec>,
>,
/// Mapping of active custody column requests for a block root /// Mapping of active custody column requests for a block root
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>, custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
@@ -250,6 +259,10 @@ pub enum RangeBlockComponent<E: EthSpec> {
DataColumnsByRangeRequestId, DataColumnsByRangeRequestId,
RpcResponseResult<Vec<Arc<DataColumnSidecar<E>>>>, RpcResponseResult<Vec<Arc<DataColumnSidecar<E>>>>,
), ),
PayloadEnvelope(
PayloadEnvelopesByRangeRequestId,
RpcResponseResult<Vec<Arc<SignedExecutionPayloadEnvelope<E>>>>,
),
} }
#[cfg(test)] #[cfg(test)]
@@ -298,6 +311,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"),
payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"),
custody_by_root_requests: <_>::default(), custody_by_root_requests: <_>::default(),
components_by_range_requests: FnvHashMap::default(), components_by_range_requests: FnvHashMap::default(),
custody_backfill_data_column_batch_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(),
@@ -326,6 +340,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests, blocks_by_range_requests,
blobs_by_range_requests, blobs_by_range_requests,
data_columns_by_range_requests, data_columns_by_range_requests,
payload_envelopes_by_range_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests // custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _, custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests // components_by_range_requests is a meta request of various _by_range requests
@@ -361,12 +376,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.active_requests_of_peer(peer_id) .active_requests_of_peer(peer_id)
.into_iter() .into_iter()
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
let payload_envelope_by_range_ids = payload_envelopes_by_range_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id));
blocks_by_root_ids blocks_by_root_ids
.chain(blobs_by_root_ids) .chain(blobs_by_root_ids)
.chain(data_column_by_root_ids) .chain(data_column_by_root_ids)
.chain(blocks_by_range_ids) .chain(blocks_by_range_ids)
.chain(blobs_by_range_ids) .chain(blobs_by_range_ids)
.chain(data_column_by_range_ids) .chain(data_column_by_range_ids)
.chain(payload_envelope_by_range_ids)
.collect() .collect()
} }
@@ -423,6 +443,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests, blocks_by_range_requests,
blobs_by_range_requests, blobs_by_range_requests,
data_columns_by_range_requests, data_columns_by_range_requests,
payload_envelopes_by_range_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests // custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _, custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests // components_by_range_requests is a meta request of various _by_range requests
@@ -445,6 +466,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.chain(blocks_by_range_requests.iter_request_peers()) .chain(blocks_by_range_requests.iter_request_peers())
.chain(blobs_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers())
.chain(data_columns_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers())
.chain(payload_envelopes_by_range_requests.iter_request_peers())
{ {
*active_request_count_by_peer.entry(peer_id).or_default() += 1; *active_request_count_by_peer.entry(peer_id).or_default() += 1;
} }
@@ -577,24 +599,26 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}; };
// Attempt to find all required custody peers before sending any request or creating an ID // Attempt to find all required custody peers before sending any request or creating an ID
let columns_by_range_peers_to_request = let columns_by_range_peers_to_request = if matches!(
if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { batch_type,
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns
let column_indexes = self ) {
.chain let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
.sampling_columns_for_epoch(epoch) let column_indexes = self
.iter() .chain
.cloned() .sampling_columns_for_epoch(epoch)
.collect(); .iter()
Some(self.select_columns_by_range_peers_to_request( .cloned()
&column_indexes, .collect();
column_peers, Some(self.select_columns_by_range_peers_to_request(
active_request_count_by_peer, &column_indexes,
peers_to_deprioritize, column_peers,
)?) active_request_count_by_peer,
} else { peers_to_deprioritize,
None )?)
}; } else {
None
};
// Create the overall components_by_range request ID before its individual components // Create the overall components_by_range request ID before its individual components
let id = ComponentsByRangeRequestId { let id = ComponentsByRangeRequestId {
@@ -659,6 +683,28 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.transpose()?; .transpose()?;
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
// Send envelope request for Gloas epochs
let payloads_req_id =
if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) {
Some(self.send_payload_envelopes_by_range_request(
block_peer,
PayloadEnvelopesByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
},
id,
new_range_request_span!(
self,
"outgoing_envelopes_by_range",
range_request_span.clone(),
block_peer
),
)?)
} else {
None
};
let info = RangeBlockComponentsRequest::new( let info = RangeBlockComponentsRequest::new(
blocks_req_id, blocks_req_id,
blobs_req_id, blobs_req_id,
@@ -668,6 +714,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.chain.sampling_columns_for_epoch(epoch).to_vec(), self.chain.sampling_columns_for_epoch(epoch).to_vec(),
) )
}), }),
payloads_req_id,
range_request_span, range_request_span,
); );
self.components_by_range_requests.insert(id, info); self.components_by_range_requests.insert(id, info);
@@ -770,6 +817,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}) })
}) })
} }
RangeBlockComponent::PayloadEnvelope(req_id, resp) => {
resp.and_then(|(envelopes, _)| {
request
.add_payload_envelopes(req_id, envelopes)
.map_err(|e| {
RpcResponseError::BlockComponentCouplingError(
CouplingError::InternalError(e),
)
})
})
}
} }
} { } {
entry.remove(); entry.remove();
@@ -1288,6 +1346,57 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok((id, requested_columns)) Ok((id, requested_columns))
} }
fn send_payload_envelopes_by_range_request(
&mut self,
peer_id: PeerId,
request: PayloadEnvelopesByRangeRequest,
parent_request_id: ComponentsByRangeRequestId,
request_span: Span,
) -> Result<PayloadEnvelopesByRangeRequestId, RpcRequestSendError> {
let id = PayloadEnvelopesByRangeRequestId {
id: self.next_id(),
parent_request_id,
};
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: RequestType::PayloadEnvelopesByRange(request.clone()),
app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)),
})
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "PayloadEnvelopesByRange",
slots = request.count,
epoch = %Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()),
peer = %peer_id,
%id,
"Sync RPC request sent"
);
self.payload_envelopes_by_range_requests.insert(
id,
peer_id,
false,
PayloadEnvelopesByRangeRequestItems::new(request),
request_span,
);
Ok(id)
}
#[allow(clippy::type_complexity)]
pub(crate) fn on_payload_envelopes_by_range_response(
&mut self,
id: PayloadEnvelopesByRangeRequestId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>>> {
let resp = self
.payload_envelopes_by_range_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(resp, peer_id)
}
pub fn is_execution_engine_online(&self) -> bool { pub fn is_execution_engine_online(&self) -> bool {
self.execution_engine_state == EngineState::Online self.execution_engine_state == EngineState::Online
} }
@@ -1369,6 +1478,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
); );
if self if self
.chain
.data_availability_checker
.envelopes_required_for_epoch(epoch)
{
ByRangeRequestType::BlocksAndEnvelopesAndColumns
} else if self
.chain .chain
.data_availability_checker .data_availability_checker
.data_columns_required_for_epoch(epoch) .data_columns_required_for_epoch(epoch)
@@ -1788,6 +1903,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
"data_columns_by_range", "data_columns_by_range",
self.data_columns_by_range_requests.len(), self.data_columns_by_range_requests.len(),
), ),
(
"payload_envelopes_by_range",
self.payload_envelopes_by_range_requests.len(),
),
("custody_by_root", self.custody_by_root_requests.len()), ("custody_by_root", self.custody_by_root_requests.len()),
( (
"components_by_range", "components_by_range",

View File

@@ -16,6 +16,7 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems;
pub use data_columns_by_root::{ pub use data_columns_by_root::{
DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest,
}; };
pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems;
use crate::metrics; use crate::metrics;
@@ -27,6 +28,7 @@ mod blocks_by_range;
mod blocks_by_root; mod blocks_by_root;
mod data_columns_by_range; mod data_columns_by_range;
mod data_columns_by_root; mod data_columns_by_root;
mod payload_envelopes_by_range;
#[derive(Debug, PartialEq, Eq, IntoStaticStr)] #[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupVerifyError { pub enum LookupVerifyError {

View File

@@ -0,0 +1,42 @@
use super::{ActiveRequestItems, LookupVerifyError};
use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest;
use std::sync::Arc;
use types::{EthSpec, SignedExecutionPayloadEnvelope};
/// Accumulates results of a payload_envelopes_by_range request. Only returns items after
/// receiving the stream termination.
pub struct PayloadEnvelopesByRangeRequestItems<E: EthSpec> {
request: PayloadEnvelopesByRangeRequest,
items: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
}
impl<E: EthSpec> PayloadEnvelopesByRangeRequestItems<E> {
pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self {
Self {
request,
items: vec![],
}
}
}
impl<E: EthSpec> ActiveRequestItems for PayloadEnvelopesByRangeRequestItems<E> {
type Item = Arc<SignedExecutionPayloadEnvelope<E>>;
fn add(&mut self, envelope: Self::Item) -> Result<bool, LookupVerifyError> {
let slot = envelope.slot();
if slot < self.request.start_slot || slot >= self.request.start_slot + self.request.count {
return Err(LookupVerifyError::UnrequestedSlot(slot));
}
if self.items.iter().any(|existing| existing.slot() == slot) {
return Err(LookupVerifyError::DuplicatedData(slot, 0));
}
self.items.push(envelope);
Ok(false)
}
fn consume(&mut self) -> Vec<Self::Item> {
std::mem::take(&mut self.items)
}
}