Gloas range sync (#9362)

N/A


  Implement range sync in gloas.
Basically requests blocks and payloads post gloas from the same peer, couples them and sends it for processing.
Does not change sync much at all other than adding the machinery for payloads by range requests.

Main changes are:
`RangeSyncBlock` which used to be a struct is an enum to account for the Gloas case. This allows a clear separation between gloas and pre-gloas code.
`AvailableBlockData` now has a `BlockInEnvelope` variant. This is to clearly indicate the post gloas case. I feel this is simpler to follow compared to `NoData` variant.


Tries to extract post gloas logic into its own functions so that there is minimal logic change in mainnet range sync behaviour.

This is meant as a stable base on which we can iterate further to make range sync cleaner and for unblocking range sync support on devnet. Some ideas for later is removing the retry mechanism in favour of delegating column fetching to lookup sync which can be done post #9155 and batch signature verifying envelopes.


Co-Authored-By: Pawan Dhananjay <pawandhananjay@gmail.com>

Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
This commit is contained in:
Pawan Dhananjay
2026-06-10 16:00:57 +05:30
committed by GitHub
parent 844c6dd4a0
commit db3192e001
24 changed files with 1311 additions and 232 deletions

View File

@@ -351,6 +351,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
CouplingError::BlobPeerFailure(msg) => {
debug!(?batch_id, msg, "Blob peer failure");
}
CouplingError::EnvelopePeerFailure(msg) => {
debug!(?batch_id, msg, "Envelope peer failure");
}
CouplingError::InternalError(msg) => {
error!(?batch_id, msg, "Block components coupling internal error");
}

View File

@@ -34,6 +34,7 @@ pub type BatchId = Epoch;
pub enum ByRangeRequestType {
BlocksAndColumns,
BlocksAndBlobs,
BlocksAndEnvelopesAndColumns,
Blocks,
Columns(HashSet<u64>),
}

View File

@@ -1,3 +1,4 @@
use beacon_chain::payload_envelope_verification::AvailableEnvelope;
use beacon_chain::{
BeaconChainTypes,
block_verification_types::{AvailableBlockData, RangeSyncBlock},
@@ -9,14 +10,15 @@ use lighthouse_network::{
PeerId,
service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
PayloadEnvelopesByRangeRequestId,
},
};
use ssz_types::RuntimeVariableList;
use std::{collections::HashMap, sync::Arc};
use tracing::{Span, debug};
use tracing::{Span, debug, warn};
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
Hash256, SignedBeaconBlock,
Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope,
};
use crate::sync::network_context::MAX_COLUMN_RETRIES;
@@ -37,6 +39,13 @@ pub struct RangeBlockComponentsRequest<E: EthSpec> {
blocks_request: ByRangeRequest<BlocksByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>,
/// Sidecars we have received awaiting for their corresponding block.
block_data_request: RangeBlockDataRequest<E>,
/// Payload envelopes for Gloas blocks.
payloads_request: Option<
ByRangeRequest<
PayloadEnvelopesByRangeRequestId,
Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
>,
>,
/// Span to track the range request and all children range requests.
pub(crate) request_span: Span,
}
@@ -71,6 +80,7 @@ pub enum CouplingError {
exceeded_retries: bool,
},
BlobPeerFailure(String),
EnvelopePeerFailure(String),
}
impl<E: EthSpec> RangeBlockComponentsRequest<E> {
@@ -88,6 +98,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Vec<(DataColumnsByRangeRequestId, Vec<ColumnIndex>)>,
Vec<ColumnIndex>,
)>,
payloads_req_id: Option<PayloadEnvelopesByRangeRequestId>,
request_span: Span,
) -> Self {
let block_data_request = if let Some(blobs_req_id) = blobs_req_id {
@@ -110,6 +121,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Self {
blocks_request: ByRangeRequest::Active(blocks_req_id),
block_data_request,
payloads_request: payloads_req_id.map(ByRangeRequest::Active),
request_span,
}
}
@@ -191,6 +203,17 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
}
pub fn add_payload_envelopes(
&mut self,
req_id: PayloadEnvelopesByRangeRequestId,
envelopes: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
) -> Result<(), String> {
match &mut self.payloads_request {
Some(req) => req.finish(req_id, envelopes),
None => Err("received payload envelopes but none were expected".to_owned()),
}
}
/// Attempts to construct RPC blocks from all received components.
///
/// Returns `None` if not all expected requests have completed.
@@ -208,6 +231,11 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
return None;
};
// Check if payload envelopes are still pending
if let Some(ByRangeRequest::Active(_)) = &self.payloads_request {
return None;
}
// Increment the attempt once this function returns the response or errors
match &mut self.block_data_request {
RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs(
@@ -254,6 +282,12 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
}
let payload_envelopes = self.payloads_request.as_ref().and_then(|request| {
request
.to_finished()
.map(|payload_envelopes| payload_envelopes.to_vec())
});
let resp = Self::responses_with_custody_columns(
blocks.to_vec(),
data_columns,
@@ -262,6 +296,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
*attempt,
da_checker,
spec,
payload_envelopes,
);
if let Err(CouplingError::DataColumnPeerFailure {
@@ -352,6 +387,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Ok(responses)
}
#[allow(clippy::too_many_arguments)]
fn responses_with_custody_columns<T>(
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
data_columns: DataColumnSidecarList<E>,
@@ -360,10 +396,19 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
attempt: usize,
da_checker: Arc<DataAvailabilityChecker<T>>,
spec: Arc<ChainSpec>,
payload_envelopes: Option<Vec<Arc<SignedExecutionPayloadEnvelope<E>>>>,
) -> Result<Vec<RangeSyncBlock<E>>, CouplingError>
where
T: BeaconChainTypes<EthSpec = E>,
{
// Index envelopes by beacon_block_root for correct coupling.
let mut envelopes_by_block_root = payload_envelopes.map(|envelopes| {
envelopes
.into_iter()
.map(|e| (e.beacon_block_root(), e))
.collect::<HashMap<_, _>>()
});
// Group data columns by block_root and index
let mut data_columns_by_block =
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
@@ -393,7 +438,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
let exceeded_retries = attempt >= MAX_COLUMN_RETRIES;
for block in blocks {
let block_root = get_block_root(&block);
range_sync_blocks.push(if block.num_expected_blobs() > 0 {
let custody_columns = if block.num_expected_blobs() > 0 {
let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root)
else {
let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect();
@@ -401,7 +446,6 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
error: format!("No columns for block {block_root:?} with data"),
faulty_peers: responsible_peers,
exceeded_retries,
});
};
@@ -415,16 +459,21 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column));
} else {
let Some(responsible_peer) = column_to_peer.get(index) else {
return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index)));
return Err(CouplingError::InternalError(format!(
"Internal error, no request made for column {}",
index
)));
};
naughty_peers.push((*index, *responsible_peer));
}
}
if !naughty_peers.is_empty() {
return Err(CouplingError::DataColumnPeerFailure {
error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"),
error: format!(
"Peers did not return column for block_root {block_root:?} {naughty_peers:?}"
),
faulty_peers: naughty_peers,
exceeded_retries
exceeded_retries,
});
}
@@ -439,15 +488,31 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
);
}
let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::<Vec<_>>());
RangeSyncBlock::new(block, block_data, &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
custody_columns
.iter()
.map(|c| c.as_data_column().clone())
.collect::<Vec<_>>()
} else {
// Block has no data, expects zero columns
vec![]
};
let range_sync_block = if let Some(envelopes_by_block_root) =
envelopes_by_block_root.as_mut()
{
let envelope = envelopes_by_block_root.remove(&block_root);
let available_envelope =
envelope.map(|env| AvailableEnvelope::new(env, custody_columns));
RangeSyncBlock::new_gloas(block, available_envelope)
.map_err(CouplingError::EnvelopePeerFailure)?
} else if custody_columns.is_empty() {
RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
});
} else {
let block_data = AvailableBlockData::new_with_data_columns(custody_columns);
RangeSyncBlock::new(block, block_data, &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
};
range_sync_blocks.push(range_sync_block);
}
// Assert that there are no columns left for other blocks
@@ -458,6 +523,13 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
debug!(?remaining_roots, "Not all columns consumed for block");
}
// Recoverable error, log and continue
if let Some(envelopes_by_block_root) = envelopes_by_block_root
&& !envelopes_by_block_root.is_empty()
{
warn!("Peer returned extra envelopes not matching any block");
}
Ok(range_sync_blocks)
}
}
@@ -489,21 +561,28 @@ mod tests {
use crate::sync::network_context::MAX_COLUMN_RETRIES;
use super::RangeBlockComponentsRequest;
use beacon_chain::block_verification_types::RangeSyncBlock;
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
use beacon_chain::test_utils::{
NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns,
test_da_checker, test_spec,
EphemeralHarnessType, NumBlobs, generate_rand_block_and_blobs,
generate_rand_block_and_data_columns, test_da_checker, test_spec,
};
use bls::Signature;
use lighthouse_network::{
PeerId,
service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id, RangeRequestId,
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id,
PayloadEnvelopesByRangeRequestId, RangeRequestId,
},
};
use std::{collections::HashMap, sync::Arc};
use tracing::Span;
use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock};
use types::{
ChainSpec, DataColumnSidecarList, Epoch, ExecutionPayloadEnvelope, ForkName,
MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope,
};
fn components_id() -> ComponentsByRangeRequestId {
ComponentsByRangeRequestId {
@@ -538,6 +617,15 @@ mod tests {
}
}
fn payloads_id(
parent_request_id: ComponentsByRangeRequestId,
) -> PayloadEnvelopesByRangeRequestId {
PayloadEnvelopesByRangeRequestId {
id: 1,
parent_request_id,
}
}
fn columns_id(
id: Id,
parent_request_id: DataColumnsByRangeRequester,
@@ -555,8 +643,166 @@ mod tests {
info.responses(da_checker, spec).is_some()
}
fn gloas_spec() -> ChainSpec {
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
spec.gloas_fork_epoch = Some(Epoch::new(0));
spec
}
fn matching_envelope(block: &SignedBeaconBlock<E>) -> Arc<SignedExecutionPayloadEnvelope<E>> {
let bid = &block
.message()
.body()
.signed_execution_payload_bid()
.expect("Gloas block should have payload bid")
.message;
let mut envelope = SignedExecutionPayloadEnvelope {
message: ExecutionPayloadEnvelope::empty(),
signature: Signature::empty(),
};
envelope.message.beacon_block_root = block.canonical_root();
envelope.message.parent_beacon_block_root = block.parent_root();
envelope.message.builder_index = bid.builder_index;
envelope.message.payload.slot_number = block.slot();
envelope.message.payload.parent_hash = bid.parent_block_hash;
envelope.message.payload.block_hash = bid.block_hash;
Arc::new(envelope)
}
#[allow(clippy::type_complexity)]
fn make_gloas_blocks_and_columns(
count: usize,
spec: &ChainSpec,
) -> Vec<(
Arc<SignedBeaconBlock<E>>,
DataColumnSidecarList<E>,
Arc<SignedExecutionPayloadEnvelope<E>>,
)> {
let mut u = types::test_utils::test_unstructured();
(0..count)
.map(|_| {
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut u,
spec,
)
.unwrap();
let envelope = matching_envelope(&block);
(Arc::new(block), data_columns, envelope)
})
.collect()
}
#[allow(clippy::type_complexity)]
fn add_all_columns(
info: &mut RangeBlockComponentsRequest<E>,
blocks: &[(
Arc<SignedBeaconBlock<E>>,
DataColumnSidecarList<E>,
Arc<SignedExecutionPayloadEnvelope<E>>,
)],
columns_req_id: &[(DataColumnsByRangeRequestId, Vec<u64>)],
expected_custody_columns: &[u64],
) {
for (i, &column_index) in expected_custody_columns.iter().enumerate() {
let (req, _columns) = columns_req_id.get(i).unwrap();
info.add_custody_columns(
*req,
blocks
.iter()
.flat_map(|(_, columns, _)| {
columns
.iter()
.filter(|column| *column.index() == column_index)
.cloned()
})
.collect(),
)
.unwrap();
}
}
#[allow(clippy::type_complexity)]
struct GloasSetup {
info: RangeBlockComponentsRequest<E>,
da_checker: Arc<DataAvailabilityChecker<EphemeralHarnessType<E>>>,
spec: Arc<ChainSpec>,
blocks: Vec<(
Arc<SignedBeaconBlock<E>>,
DataColumnSidecarList<E>,
Arc<SignedExecutionPayloadEnvelope<E>>,
)>,
payloads_req_id: PayloadEnvelopesByRangeRequestId,
expected_custody_columns: Vec<u64>,
}
/// Builds a Gloas coupling request with `count` blocks and all custody columns added,
/// ready for the per-test payload-envelope step.
fn setup_gloas_coupling(count: usize) -> GloasSetup {
let spec = Arc::new(gloas_spec());
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
let expected_custody_columns = da_checker
.custody_context()
.sampling_columns_for_epoch(Epoch::new(0), &spec)
.to_vec();
let blocks = make_gloas_blocks_and_columns(count, &spec);
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let payloads_req_id = payloads_id(components_id);
let columns_req_id = expected_custody_columns
.iter()
.enumerate()
.map(|(i, column)| {
(
columns_id(
i as Id,
DataColumnsByRangeRequester::ComponentsByRange(components_id),
),
vec![*column],
)
})
.collect::<Vec<_>>();
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
Some(payloads_req_id),
Span::none(),
);
info.add_blocks(
blocks_req_id,
blocks.iter().map(|(block, _, _)| block.clone()).collect(),
)
.unwrap();
add_all_columns(
&mut info,
&blocks,
&columns_req_id,
&expected_custody_columns,
);
GloasSetup {
info,
da_checker,
spec,
blocks,
payloads_req_id,
expected_custody_columns,
}
}
#[test]
fn no_blobs_into_responses() {
// This exercises the pre-Gloas blobs/no-data coupling path. Gloas coupling is covered
// by the dedicated `setup_gloas_coupling` tests below.
if skip_under_gloas() {
return;
}
let spec = Arc::new(test_spec::<E>());
let mut u = types::test_utils::test_unstructured();
@@ -575,7 +821,7 @@ mod tests {
let blocks_req_id = blocks_id(components_id());
let mut info =
RangeBlockComponentsRequest::<E>::new(blocks_req_id, None, None, Span::none());
RangeBlockComponentsRequest::<E>::new(blocks_req_id, None, None, None, Span::none());
// Send blocks and complete terminate response
info.add_blocks(blocks_req_id, blocks).unwrap();
@@ -606,6 +852,7 @@ mod tests {
blocks_req_id,
Some(blobs_req_id),
None,
None,
Span::none(),
);
@@ -672,6 +919,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expects_custody_columns.clone())),
None,
Span::none(),
);
// Send blocks and complete terminate response
@@ -751,6 +999,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
None,
Span::none(),
);
@@ -804,6 +1053,115 @@ mod tests {
info.responses(da_checker, spec).unwrap().unwrap();
}
#[test]
fn gloas_payload_envelopes_must_complete_before_responses() {
let GloasSetup {
mut info,
da_checker,
spec,
..
} = setup_gloas_coupling(2);
// No payload envelopes added yet, so the request must not be complete.
assert!(info.responses(da_checker, spec).is_none());
}
#[test]
fn gloas_payload_envelopes_are_coupled_by_block_root() {
let GloasSetup {
mut info,
da_checker,
spec,
blocks,
payloads_req_id,
expected_custody_columns,
} = setup_gloas_coupling(2);
// Supply envelopes in reverse order to prove coupling is by block root, not position.
info.add_payload_envelopes(
payloads_req_id,
blocks
.iter()
.rev()
.map(|(_, _, envelope)| envelope.clone())
.collect(),
)
.unwrap();
let responses = info.responses(da_checker, spec).unwrap().unwrap();
assert_eq!(responses.len(), blocks.len());
for response in responses {
match response {
RangeSyncBlock::Gloas {
block,
envelope: Some(envelope),
} => {
assert_eq!(
envelope.envelope().beacon_block_root(),
block.canonical_root()
);
assert_eq!(envelope.columns.len(), expected_custody_columns.len());
}
other => panic!("expected Gloas block with envelope, got {other:?}"),
}
}
}
#[test]
fn gloas_payload_envelopes_allow_missing_envelopes() {
let GloasSetup {
mut info,
da_checker,
spec,
blocks,
payloads_req_id,
..
} = setup_gloas_coupling(2);
// Supply an envelope for only one of the two blocks.
info.add_payload_envelopes(payloads_req_id, vec![blocks[0].2.clone()])
.unwrap();
let responses = info.responses(da_checker, spec).unwrap().unwrap();
let count_with = |with_envelope: bool| {
responses
.iter()
.filter(|response| {
matches!(response, RangeSyncBlock::Gloas { envelope, .. } if envelope.is_some() == with_envelope)
})
.count()
};
assert_eq!(count_with(true), 1);
assert_eq!(count_with(false), 1);
}
#[test]
fn gloas_payload_envelope_mismatch_fails_coupling() {
let GloasSetup {
mut info,
da_checker,
spec,
blocks,
payloads_req_id,
..
} = setup_gloas_coupling(1);
let mut bad_envelope = (*blocks[0].2).clone();
bad_envelope.message.payload.slot_number += 1;
info.add_payload_envelopes(payloads_req_id, vec![Arc::new(bad_envelope)])
.unwrap();
let result = info.responses(da_checker, spec).unwrap();
assert!(
matches!(
result,
Err(super::CouplingError::EnvelopePeerFailure(ref error))
if error.contains("SlotMismatch")
),
"expected envelope slot mismatch, got {result:?}"
);
}
#[test]
fn missing_custody_columns_from_faulty_peers() {
if skip_under_gloas() {
@@ -848,6 +1206,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
None,
Span::none(),
);
@@ -949,6 +1308,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
None,
Span::none(),
);
@@ -1068,6 +1428,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
None,
Span::none(),
);

View File

@@ -57,7 +57,8 @@ use lighthouse_network::service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester,
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId,
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId,
SyncRequestId,
};
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::{PeerAction, PeerId};
@@ -504,6 +505,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::DataColumnsByRange(req_id) => {
self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::PayloadEnvelopesByRange(req_id) => self
.on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)),
}
}
@@ -1160,6 +1163,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id,
RpcEvent::from_chunk(envelope, seen_timestamp),
),
SyncRequestId::PayloadEnvelopesByRange(req_id) => {
self.on_payload_envelopes_by_range_response(
req_id,
peer_id,
RpcEvent::from_chunk(envelope, seen_timestamp),
);
}
_ => {
crit!(%peer_id, "bad request id for payload envelope");
}
@@ -1214,6 +1224,24 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn on_payload_envelopes_by_range_response(
&mut self,
id: PayloadEnvelopesByRangeRequestId,
peer_id: PeerId,
envelope: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
if let Some(resp) = self
.network
.on_payload_envelopes_by_range_response(id, peer_id, envelope)
{
self.on_range_components_response(
id.parent_request_id,
peer_id,
RangeBlockComponent::PayloadEnvelope(id, resp),
);
}
}
fn on_data_columns_by_root_response(
&mut self,
req_id: DataColumnsByRootRequestId,

View File

@@ -24,14 +24,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
use custody::CustodyRequestResult;
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest,
};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType};
pub use lighthouse_network::service::api_types::RangeRequestId;
use lighthouse_network::service::api_types::{
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester,
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId,
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId,
SyncRequestId,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use parking_lot::RwLock;
@@ -39,7 +42,7 @@ pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems,
DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRootRequestItems,
PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems,
};
#[cfg(test)]
use slot_clock::SlotClock;
@@ -217,6 +220,11 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A mapping of active DataColumnsByRange requests
data_columns_by_range_requests:
ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>,
/// A mapping of active PayloadEnvelopesByRange requests
payload_envelopes_by_range_requests: ActiveRequests<
PayloadEnvelopesByRangeRequestId,
PayloadEnvelopesByRangeRequestItems<T::EthSpec>,
>,
/// Mapping of active custody column requests for a block root
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
@@ -254,6 +262,10 @@ pub enum RangeBlockComponent<E: EthSpec> {
DataColumnsByRangeRequestId,
RpcResponseResult<Vec<Arc<DataColumnSidecar<E>>>>,
),
PayloadEnvelope(
PayloadEnvelopesByRangeRequestId,
RpcResponseResult<Vec<Arc<SignedExecutionPayloadEnvelope<E>>>>,
),
}
#[cfg(test)]
@@ -302,6 +314,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"),
payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"),
custody_by_root_requests: <_>::default(),
components_by_range_requests: FnvHashMap::default(),
custody_backfill_data_column_batch_requests: FnvHashMap::default(),
@@ -334,6 +347,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests,
blobs_by_range_requests,
data_columns_by_range_requests,
payload_envelopes_by_range_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
@@ -369,12 +383,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
let payload_envelope_by_range_ids = payload_envelopes_by_range_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id));
blocks_by_root_ids
.chain(payload_envelopes_by_root_ids)
.chain(data_column_by_root_ids)
.chain(blocks_by_range_ids)
.chain(blobs_by_range_ids)
.chain(data_column_by_range_ids)
.chain(payload_envelope_by_range_ids)
.collect()
}
@@ -431,6 +450,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests,
blobs_by_range_requests,
data_columns_by_range_requests,
payload_envelopes_by_range_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
@@ -453,6 +473,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.chain(blocks_by_range_requests.iter_request_peers())
.chain(blobs_by_range_requests.iter_request_peers())
.chain(data_columns_by_range_requests.iter_request_peers())
.chain(payload_envelopes_by_range_requests.iter_request_peers())
{
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
}
@@ -585,24 +606,26 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
};
// Attempt to find all required custody peers before sending any request or creating an ID
let columns_by_range_peers_to_request =
if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
let column_indexes = self
.chain
.sampling_columns_for_epoch(epoch)
.iter()
.cloned()
.collect();
Some(self.select_columns_by_range_peers_to_request(
&column_indexes,
column_peers,
active_request_count_by_peer,
peers_to_deprioritize,
)?)
} else {
None
};
let columns_by_range_peers_to_request = if matches!(
batch_type,
ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns
) {
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
let column_indexes = self
.chain
.sampling_columns_for_epoch(epoch)
.iter()
.cloned()
.collect();
Some(self.select_columns_by_range_peers_to_request(
&column_indexes,
column_peers,
active_request_count_by_peer,
peers_to_deprioritize,
)?)
} else {
None
};
// Create the overall components_by_range request ID before its individual components
let id = ComponentsByRangeRequestId {
@@ -666,6 +689,26 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
.transpose()?;
let payloads_req_id =
if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) {
Some(self.send_payload_envelopes_by_range_request(
block_peer,
PayloadEnvelopesByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
},
id,
new_range_request_span!(
self,
"outgoing_envelopes_by_range",
range_request_span.clone(),
block_peer
),
)?)
} else {
None
};
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
let info = RangeBlockComponentsRequest::new(
blocks_req_id,
@@ -676,6 +719,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.chain.sampling_columns_for_epoch(epoch).to_vec(),
)
}),
payloads_req_id,
range_request_span,
);
self.components_by_range_requests.insert(id, info);
@@ -778,6 +822,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
})
}
RangeBlockComponent::PayloadEnvelope(req_id, resp) => {
resp.and_then(|(envelopes, _)| {
request
.add_payload_envelopes(req_id, envelopes)
.map_err(|e| {
RpcResponseError::BlockComponentCouplingError(
CouplingError::InternalError(e),
)
})
})
}
}
} {
entry.remove();
@@ -1269,6 +1324,43 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok((id, requested_columns))
}
fn send_payload_envelopes_by_range_request(
&mut self,
peer_id: PeerId,
request: PayloadEnvelopesByRangeRequest,
parent_request_id: ComponentsByRangeRequestId,
request_span: Span,
) -> Result<PayloadEnvelopesByRangeRequestId, RpcRequestSendError> {
let id = PayloadEnvelopesByRangeRequestId {
id: self.next_id(),
parent_request_id,
};
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: RequestType::PayloadEnvelopesByRange(request.clone()),
app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)),
})
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "PayloadEnvelopesByRange",
slots = request.count,
peer = %peer_id,
%id,
"Sync RPC request sent"
);
self.payload_envelopes_by_range_requests.insert(
id,
peer_id,
false,
PayloadEnvelopesByRangeRequestItems::new(request),
request_span,
);
Ok(id)
}
pub fn is_execution_engine_online(&self) -> bool {
self.execution_engine_state == EngineState::Online
}
@@ -1349,7 +1441,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
"To deal with alignment with deneb boundaries, batches need to be of just one epoch"
);
if self
if self.chain.spec.fork_name_at_epoch(epoch).gloas_enabled() {
// TODO(gloas): Not precise and we can be post-gloas and not require columns
ByRangeRequestType::BlocksAndEnvelopesAndColumns
} else if self
.chain
.data_availability_checker
.data_columns_required_for_epoch(epoch)
@@ -1485,6 +1580,19 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
pub(crate) fn on_payload_envelopes_by_range_response(
&mut self,
id: PayloadEnvelopesByRangeRequestId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>>> {
let resp = self
.payload_envelopes_by_range_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(resp, peer_id)
}
/// Common handler for consistent scoring of RpcResponseError
fn on_rpc_response_result<R>(
&mut self,

View File

@@ -15,6 +15,7 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems;
pub use data_columns_by_root::{
DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest,
};
pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems;
pub use payload_envelopes_by_root::{
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
};
@@ -28,6 +29,7 @@ mod blocks_by_range;
mod blocks_by_root;
mod data_columns_by_range;
mod data_columns_by_root;
mod payload_envelopes_by_range;
mod payload_envelopes_by_root;
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]

View File

@@ -0,0 +1,48 @@
use super::{ActiveRequestItems, LookupVerifyError};
use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest;
use std::sync::Arc;
use types::{EthSpec, SignedExecutionPayloadEnvelope};
/// Accumulates results of a payload_envelopes_by_range request. Only returns items after
/// receiving the stream termination.
pub struct PayloadEnvelopesByRangeRequestItems<E: EthSpec> {
request: PayloadEnvelopesByRangeRequest,
items: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
}
impl<E: EthSpec> PayloadEnvelopesByRangeRequestItems<E> {
pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self {
Self {
request,
items: vec![],
}
}
}
impl<E: EthSpec> ActiveRequestItems for PayloadEnvelopesByRangeRequestItems<E> {
type Item = Arc<SignedExecutionPayloadEnvelope<E>>;
fn add(&mut self, envelope: Self::Item) -> Result<bool, LookupVerifyError> {
if envelope.slot().as_u64() < self.request.start_slot
|| envelope.slot().as_u64() >= self.request.start_slot + self.request.count
{
return Err(LookupVerifyError::UnrequestedSlot(envelope.slot()));
}
if self
.items
.iter()
.any(|existing| existing.slot() == envelope.slot())
{
return Err(LookupVerifyError::DuplicatedData(envelope.slot(), 0));
}
self.items.push(envelope);
Ok(self.items.len() >= self.request.count as usize)
}
fn consume(&mut self) -> Vec<Self::Item> {
std::mem::take(&mut self.items)
}
}

View File

@@ -952,6 +952,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
CouplingError::BlobPeerFailure(msg) => {
debug!(?batch_id, msg, "Blob peer failure");
}
CouplingError::EnvelopePeerFailure(msg) => {
debug!(?batch_id, msg, "Envelope peer failure");
}
CouplingError::InternalError(msg) => {
error!(?batch_id, msg, "Block components coupling internal error");
}

View File

@@ -12,6 +12,7 @@ use crate::sync::{
};
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::payload_envelope_verification::AvailableEnvelope;
use beacon_chain::{
AvailabilityProcessingStatus, EngineState, NotifyExecutionLayer,
block_verification_types::{AsBlock, AvailableBlockData},
@@ -613,7 +614,6 @@ impl TestRig {
.unwrap_or_else(|| {
panic!("Test consumer requested unknown block: {id:?}")
})
.block_data()
.data_columns()
.unwrap_or_else(|| panic!("Block id {id:?} has no columns"));
id.columns
@@ -763,7 +763,7 @@ impl TestRig {
.return_wrong_range_column_indices_n_times -= 1;
let wrong_columns = (req.start_slot..req.start_slot + req.count)
.filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot)))
.filter_map(|block| block.block_data().data_columns())
.filter_map(|block| block.data_columns())
.flat_map(|columns| {
columns
.into_iter()
@@ -787,7 +787,7 @@ impl TestRig {
let wrong_columns = self
.network_blocks_by_slot
.get(&Slot::new(wrong_slot))
.and_then(|block| block.block_data().data_columns())
.and_then(|block| block.data_columns())
.into_iter()
.flat_map(|columns| {
columns
@@ -804,7 +804,7 @@ impl TestRig {
self.complete_strategy.return_partial_range_columns_n_times -= 1;
let columns = (req.start_slot..req.start_slot + req.count)
.filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot)))
.filter_map(|block| block.block_data().data_columns())
.filter_map(|block| block.data_columns())
.flat_map(|columns| {
columns
.into_iter()
@@ -820,7 +820,7 @@ impl TestRig {
let columns = (req.start_slot..req.start_slot + req.count)
.filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot)))
.filter_map(|block| block.block_data().data_columns())
.filter_map(|block| block.data_columns())
.flat_map(|columns| {
columns
.into_iter()
@@ -830,6 +830,25 @@ impl TestRig {
self.send_rpc_columns_response(req_id, peer_id, &columns);
}
(RequestType::PayloadEnvelopesByRange(req), AppRequestId::Sync(req_id)) => {
if self.complete_strategy.skip_by_range_routes {
return;
}
let envelopes = (req.start_slot..req.start_slot + req.count)
.filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot)))
.filter_map(|block| {
let block_root = block.canonical_root();
// Respect a withheld payload envelope.
if self.complete_strategy.hold_envelope_for_block == Some(block_root) {
return None;
}
self.network_envelopes_by_root.get(&block_root).cloned()
})
.collect::<Vec<_>>();
self.send_rpc_envelopes_response(req_id, peer_id, &envelopes);
}
(RequestType::Status(_req), AppRequestId::Router) => {
// Ignore Status requests for now
}
@@ -958,6 +977,34 @@ impl TestRig {
});
}
fn send_rpc_envelopes_response(
&mut self,
sync_request_id: SyncRequestId,
peer_id: PeerId,
envelopes: &[Arc<SignedExecutionPayloadEnvelope<E>>],
) {
let slots = envelopes.iter().map(|e| e.slot()).collect::<Vec<_>>();
self.log(&format!(
"Completing request {sync_request_id:?} to {peer_id} with envelopes {slots:?}"
));
for envelope in envelopes {
self.push_sync_message(SyncMessage::RpcPayloadEnvelope {
sync_request_id,
peer_id,
envelope: Some(envelope.clone()),
seen_timestamp: D,
});
}
// Stream termination
self.push_sync_message(SyncMessage::RpcPayloadEnvelope {
sync_request_id,
peer_id,
envelope: None,
seen_timestamp: D,
});
}
#[allow(dead_code)]
fn is_after_gloas(&self) -> bool {
self.fork_name.gloas_enabled()
@@ -1177,7 +1224,7 @@ impl TestRig {
let range_sync_block = self.get_last_block().clone();
let mut block = (*range_sync_block.block_cloned()).clone();
let blobs = range_sync_block.block_data().blobs();
let columns = range_sync_block.block_data().data_columns();
let columns = range_sync_block.data_columns();
*block.signature_mut() = self.valid_signature();
self.re_insert_block(Arc::new(block), blobs, columns);
}
@@ -1192,10 +1239,7 @@ impl TestRig {
let range_sync_block = self.get_last_block().clone();
let block = range_sync_block.block_cloned();
let blobs = range_sync_block.block_data().blobs();
let mut columns = range_sync_block
.block_data()
.data_columns()
.expect("no columns");
let mut columns = range_sync_block.data_columns().expect("no columns");
let first = columns.first_mut().expect("empty columns");
Arc::make_mut(first)
.signed_block_header_mut()
@@ -1217,10 +1261,7 @@ impl TestRig {
.clone();
let block = range_sync_block.block_cloned();
let blobs = range_sync_block.block_data().blobs();
let mut columns = range_sync_block
.block_data()
.data_columns()
.expect("no columns");
let mut columns = range_sync_block.data_columns().expect("no columns");
let first = columns.first_mut().expect("empty columns");
let column = Arc::make_mut(first);
let proof = column.kzg_proofs_mut().first_mut().expect("no kzg proofs");
@@ -1256,20 +1297,30 @@ impl TestRig {
) {
let block_root = block.canonical_root();
let block_slot = block.slot();
let block_data = if let Some(columns) = columns {
AvailableBlockData::new_with_data_columns(columns)
} else if let Some(blobs) = blobs {
AvailableBlockData::new_with_blobs(blobs)
let range_sync_block = if block.fork_name_unchecked().gloas_enabled() {
// Gloas carries data columns in the payload envelope, not in `block_data`.
let envelope = self
.network_envelopes_by_root
.get(&block_root)
.cloned()
.map(|envelope| AvailableEnvelope::new(envelope, columns.unwrap_or_default()));
RangeSyncBlock::new_gloas(block, envelope).unwrap()
} else {
AvailableBlockData::NoData
let block_data = if let Some(columns) = columns {
AvailableBlockData::new_with_data_columns(columns)
} else if let Some(blobs) = blobs {
AvailableBlockData::new_with_blobs(blobs)
} else {
AvailableBlockData::NoData
};
RangeSyncBlock::new(
block,
block_data,
&self.harness.chain.data_availability_checker,
self.harness.chain.spec.clone(),
)
.unwrap()
};
let range_sync_block = RangeSyncBlock::new(
block,
block_data,
&self.harness.chain.data_availability_checker,
self.harness.chain.spec.clone(),
)
.unwrap();
self.network_blocks_by_slot
.insert(block_slot, range_sync_block.clone());
self.network_blocks_by_root
@@ -1367,7 +1418,6 @@ impl TestRig {
let peer_id = self.new_connected_supernode_peer();
let columns = self
.get_last_block()
.block_data()
.data_columns()
.expect("No data columns");
let column = columns.first().expect("empty columns");