mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 00:31:50 +00:00
Track request IDs in RangeBlockComponentsRequest (#6998)
Part of - https://github.com/sigp/lighthouse/issues/6258 `RangeBlockComponentsRequest` handles a set of by_range requests. It's quite lose on these requests, not tracking them by ID. We want to implement individual request retries, so we must make `RangeBlockComponentsRequest` aware of its requests IDs. We don't want the result of a prior by_range request to affect the state of a future retry. Lookup sync uses this mechanism. Now `RangeBlockComponentsRequest` tracks: ```rust pub struct RangeBlockComponentsRequest<E: EthSpec> { blocks_request: ByRangeRequest<BlocksByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>, block_data_request: RangeBlockDataRequest<E>, } enum RangeBlockDataRequest<E: EthSpec> { NoData, Blobs(ByRangeRequest<BlobsByRangeRequestId, Vec<Arc<BlobSidecar<E>>>>), DataColumns { requests: HashMap< DataColumnsByRangeRequestId, ByRangeRequest<DataColumnsByRangeRequestId, DataColumnSidecarList<E>>, >, expected_custody_columns: Vec<ColumnIndex>, }, } enum ByRangeRequest<I: PartialEq + std::fmt::Display, T> { Active(I), Complete(T), } ``` I have merged `is_finished` and `Into_responses` into the same function. Otherwise, we need to duplicate the logic to figure out if the requests are done.
This commit is contained in:
@@ -1,89 +1,156 @@
|
||||
use beacon_chain::{
|
||||
block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root,
|
||||
};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
sync::Arc,
|
||||
use lighthouse_network::service::api_types::{
|
||||
BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
|
||||
};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use types::{
|
||||
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, RuntimeVariableList,
|
||||
SignedBeaconBlock,
|
||||
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
|
||||
Hash256, RuntimeVariableList, SignedBeaconBlock,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RangeBlockComponentsRequest<E: EthSpec> {
|
||||
/// Blocks we have received awaiting for their corresponding sidecar.
|
||||
blocks: VecDeque<Arc<SignedBeaconBlock<E>>>,
|
||||
blocks_request: ByRangeRequest<BlocksByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>,
|
||||
/// Sidecars we have received awaiting for their corresponding block.
|
||||
blobs: VecDeque<Arc<BlobSidecar<E>>>,
|
||||
data_columns: VecDeque<Arc<DataColumnSidecar<E>>>,
|
||||
/// Whether the individual RPC request for blocks is finished or not.
|
||||
is_blocks_stream_terminated: bool,
|
||||
/// Whether the individual RPC request for sidecars is finished or not.
|
||||
is_sidecars_stream_terminated: bool,
|
||||
custody_columns_streams_terminated: usize,
|
||||
/// Used to determine if this accumulator should wait for a sidecars stream termination
|
||||
expects_blobs: bool,
|
||||
expects_custody_columns: Option<Vec<ColumnIndex>>,
|
||||
/// Used to determine if the number of data columns stream termination this accumulator should
|
||||
/// wait for. This may be less than the number of `expects_custody_columns` due to request batching.
|
||||
num_custody_column_requests: Option<usize>,
|
||||
block_data_request: RangeBlockDataRequest<E>,
|
||||
}
|
||||
|
||||
enum ByRangeRequest<I: PartialEq + std::fmt::Display, T> {
|
||||
Active(I),
|
||||
Complete(T),
|
||||
}
|
||||
|
||||
enum RangeBlockDataRequest<E: EthSpec> {
|
||||
NoData,
|
||||
Blobs(ByRangeRequest<BlobsByRangeRequestId, Vec<Arc<BlobSidecar<E>>>>),
|
||||
DataColumns {
|
||||
requests: HashMap<
|
||||
DataColumnsByRangeRequestId,
|
||||
ByRangeRequest<DataColumnsByRangeRequestId, DataColumnSidecarList<E>>,
|
||||
>,
|
||||
expected_custody_columns: Vec<ColumnIndex>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
pub fn new(
|
||||
expects_blobs: bool,
|
||||
expects_custody_columns: Option<Vec<ColumnIndex>>,
|
||||
num_custody_column_requests: Option<usize>,
|
||||
blocks_req_id: BlocksByRangeRequestId,
|
||||
blobs_req_id: Option<BlobsByRangeRequestId>,
|
||||
data_columns: Option<(Vec<DataColumnsByRangeRequestId>, Vec<ColumnIndex>)>,
|
||||
) -> Self {
|
||||
Self {
|
||||
blocks: <_>::default(),
|
||||
blobs: <_>::default(),
|
||||
data_columns: <_>::default(),
|
||||
is_blocks_stream_terminated: false,
|
||||
is_sidecars_stream_terminated: false,
|
||||
custody_columns_streams_terminated: 0,
|
||||
expects_blobs,
|
||||
expects_custody_columns,
|
||||
num_custody_column_requests,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_blocks(&mut self, blocks: Vec<Arc<SignedBeaconBlock<E>>>) {
|
||||
for block in blocks {
|
||||
self.blocks.push_back(block);
|
||||
}
|
||||
self.is_blocks_stream_terminated = true;
|
||||
}
|
||||
|
||||
pub fn add_blobs(&mut self, blobs: Vec<Arc<BlobSidecar<E>>>) {
|
||||
for blob in blobs {
|
||||
self.blobs.push_back(blob);
|
||||
}
|
||||
self.is_sidecars_stream_terminated = true;
|
||||
}
|
||||
|
||||
pub fn add_custody_columns(&mut self, columns: Vec<Arc<DataColumnSidecar<E>>>) {
|
||||
for column in columns {
|
||||
self.data_columns.push_back(column);
|
||||
}
|
||||
// TODO(das): this mechanism is dangerous, if somehow there are two requests for the
|
||||
// same column index it can terminate early. This struct should track that all requests
|
||||
// for all custody columns terminate.
|
||||
self.custody_columns_streams_terminated += 1;
|
||||
}
|
||||
|
||||
pub fn into_responses(self, spec: &ChainSpec) -> Result<Vec<RpcBlock<E>>, String> {
|
||||
if let Some(expects_custody_columns) = self.expects_custody_columns.clone() {
|
||||
self.into_responses_with_custody_columns(expects_custody_columns, spec)
|
||||
let block_data_request = if let Some(blobs_req_id) = blobs_req_id {
|
||||
RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id))
|
||||
} else if let Some((requests, expected_custody_columns)) = data_columns {
|
||||
RangeBlockDataRequest::DataColumns {
|
||||
requests: requests
|
||||
.into_iter()
|
||||
.map(|id| (id, ByRangeRequest::Active(id)))
|
||||
.collect(),
|
||||
expected_custody_columns,
|
||||
}
|
||||
} else {
|
||||
self.into_responses_with_blobs(spec)
|
||||
RangeBlockDataRequest::NoData
|
||||
};
|
||||
|
||||
Self {
|
||||
blocks_request: ByRangeRequest::Active(blocks_req_id),
|
||||
block_data_request,
|
||||
}
|
||||
}
|
||||
|
||||
fn into_responses_with_blobs(self, spec: &ChainSpec) -> Result<Vec<RpcBlock<E>>, String> {
|
||||
let RangeBlockComponentsRequest { blocks, blobs, .. } = self;
|
||||
pub fn add_blocks(
|
||||
&mut self,
|
||||
req_id: BlocksByRangeRequestId,
|
||||
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
|
||||
) -> Result<(), String> {
|
||||
self.blocks_request.finish(req_id, blocks)
|
||||
}
|
||||
|
||||
pub fn add_blobs(
|
||||
&mut self,
|
||||
req_id: BlobsByRangeRequestId,
|
||||
blobs: Vec<Arc<BlobSidecar<E>>>,
|
||||
) -> Result<(), String> {
|
||||
match &mut self.block_data_request {
|
||||
RangeBlockDataRequest::NoData => Err("received blobs but expected no data".to_owned()),
|
||||
RangeBlockDataRequest::Blobs(ref mut req) => req.finish(req_id, blobs),
|
||||
RangeBlockDataRequest::DataColumns { .. } => {
|
||||
Err("received blobs but expected data columns".to_owned())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_custody_columns(
|
||||
&mut self,
|
||||
req_id: DataColumnsByRangeRequestId,
|
||||
columns: Vec<Arc<DataColumnSidecar<E>>>,
|
||||
) -> Result<(), String> {
|
||||
match &mut self.block_data_request {
|
||||
RangeBlockDataRequest::NoData => {
|
||||
Err("received data columns but expected no data".to_owned())
|
||||
}
|
||||
RangeBlockDataRequest::Blobs(_) => {
|
||||
Err("received data columns but expected blobs".to_owned())
|
||||
}
|
||||
RangeBlockDataRequest::DataColumns {
|
||||
ref mut requests, ..
|
||||
} => {
|
||||
let req = requests
|
||||
.get_mut(&req_id)
|
||||
.ok_or(format!("unknown data columns by range req_id {req_id}"))?;
|
||||
req.finish(req_id, columns)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn responses(&self, spec: &ChainSpec) -> Option<Result<Vec<RpcBlock<E>>, String>> {
|
||||
let Some(blocks) = self.blocks_request.to_finished() else {
|
||||
return None;
|
||||
};
|
||||
|
||||
match &self.block_data_request {
|
||||
RangeBlockDataRequest::NoData => {
|
||||
Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec))
|
||||
}
|
||||
RangeBlockDataRequest::Blobs(request) => {
|
||||
let Some(blobs) = request.to_finished() else {
|
||||
return None;
|
||||
};
|
||||
Some(Self::responses_with_blobs(
|
||||
blocks.to_vec(),
|
||||
blobs.to_vec(),
|
||||
spec,
|
||||
))
|
||||
}
|
||||
RangeBlockDataRequest::DataColumns {
|
||||
requests,
|
||||
expected_custody_columns,
|
||||
} => {
|
||||
let mut data_columns = vec![];
|
||||
for req in requests.values() {
|
||||
let Some(data) = req.to_finished() else {
|
||||
return None;
|
||||
};
|
||||
data_columns.extend(data.clone())
|
||||
}
|
||||
|
||||
Some(Self::responses_with_custody_columns(
|
||||
blocks.to_vec(),
|
||||
data_columns,
|
||||
expected_custody_columns,
|
||||
spec,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn responses_with_blobs(
|
||||
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
|
||||
blobs: Vec<Arc<BlobSidecar<E>>>,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Vec<RpcBlock<E>>, String> {
|
||||
// There can't be more more blobs than blocks. i.e. sending any blob (empty
|
||||
// included) for a skipped slot is not permitted.
|
||||
let mut responses = Vec::with_capacity(blocks.len());
|
||||
@@ -129,17 +196,12 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
Ok(responses)
|
||||
}
|
||||
|
||||
fn into_responses_with_custody_columns(
|
||||
self,
|
||||
expects_custody_columns: Vec<ColumnIndex>,
|
||||
fn responses_with_custody_columns(
|
||||
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
|
||||
data_columns: DataColumnSidecarList<E>,
|
||||
expects_custody_columns: &[ColumnIndex],
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Vec<RpcBlock<E>>, String> {
|
||||
let RangeBlockComponentsRequest {
|
||||
blocks,
|
||||
data_columns,
|
||||
..
|
||||
} = self;
|
||||
|
||||
// Group data columns by block_root and index
|
||||
let mut data_columns_by_block =
|
||||
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
|
||||
@@ -177,7 +239,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
};
|
||||
|
||||
let mut custody_columns = vec![];
|
||||
for index in &expects_custody_columns {
|
||||
for index in expects_custody_columns {
|
||||
let Some(data_column) = data_columns_by_index.remove(index) else {
|
||||
return Err(format!("No column for block {block_root:?} index {index}"));
|
||||
};
|
||||
@@ -210,20 +272,27 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
|
||||
Ok(rpc_blocks)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_finished(&self) -> bool {
|
||||
if !self.is_blocks_stream_terminated {
|
||||
return false;
|
||||
}
|
||||
if self.expects_blobs && !self.is_sidecars_stream_terminated {
|
||||
return false;
|
||||
}
|
||||
if let Some(expects_custody_column_responses) = self.num_custody_column_requests {
|
||||
if self.custody_columns_streams_terminated < expects_custody_column_responses {
|
||||
return false;
|
||||
impl<I: PartialEq + std::fmt::Display, T> ByRangeRequest<I, T> {
|
||||
fn finish(&mut self, id: I, data: T) -> Result<(), String> {
|
||||
match self {
|
||||
Self::Active(expected_id) => {
|
||||
if expected_id != &id {
|
||||
return Err(format!("unexpected req_id expected {expected_id} got {id}"));
|
||||
}
|
||||
*self = Self::Complete(data);
|
||||
Ok(())
|
||||
}
|
||||
Self::Complete(_) => Err("request already complete".to_owned()),
|
||||
}
|
||||
}
|
||||
|
||||
fn to_finished(&self) -> Option<&T> {
|
||||
match self {
|
||||
Self::Active(_) => None,
|
||||
Self::Complete(data) => Some(data),
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,9 +302,52 @@ mod tests {
|
||||
use beacon_chain::test_utils::{
|
||||
generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs,
|
||||
};
|
||||
use lighthouse_network::service::api_types::{
|
||||
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
|
||||
DataColumnsByRangeRequestId, Id, RangeRequestId,
|
||||
};
|
||||
use rand::SeedableRng;
|
||||
use std::sync::Arc;
|
||||
use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E, SignedBeaconBlock};
|
||||
use types::{test_utils::XorShiftRng, Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock};
|
||||
|
||||
fn components_id() -> ComponentsByRangeRequestId {
|
||||
ComponentsByRangeRequestId {
|
||||
id: 0,
|
||||
requester: RangeRequestId::RangeSync {
|
||||
chain_id: 1,
|
||||
batch_id: Epoch::new(0),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn blocks_id(parent_request_id: ComponentsByRangeRequestId) -> BlocksByRangeRequestId {
|
||||
BlocksByRangeRequestId {
|
||||
id: 1,
|
||||
parent_request_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn blobs_id(parent_request_id: ComponentsByRangeRequestId) -> BlobsByRangeRequestId {
|
||||
BlobsByRangeRequestId {
|
||||
id: 1,
|
||||
parent_request_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn columns_id(
|
||||
id: Id,
|
||||
parent_request_id: ComponentsByRangeRequestId,
|
||||
) -> DataColumnsByRangeRequestId {
|
||||
DataColumnsByRangeRequestId {
|
||||
id,
|
||||
parent_request_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_finished(info: &RangeBlockComponentsRequest<E>) -> bool {
|
||||
let spec = test_spec::<E>();
|
||||
info.responses(&spec).is_some()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_blobs_into_responses() {
|
||||
@@ -248,14 +360,15 @@ mod tests {
|
||||
.into()
|
||||
})
|
||||
.collect::<Vec<Arc<SignedBeaconBlock<E>>>>();
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(false, None, None);
|
||||
|
||||
let blocks_req_id = blocks_id(components_id());
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(blocks_req_id, None, None);
|
||||
|
||||
// Send blocks and complete terminate response
|
||||
info.add_blocks(blocks);
|
||||
info.add_blocks(blocks_req_id, blocks).unwrap();
|
||||
|
||||
// Assert response is finished and RpcBlocks can be constructed
|
||||
assert!(info.is_finished());
|
||||
info.into_responses(&test_spec::<E>()).unwrap();
|
||||
info.responses(&test_spec::<E>()).unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -275,18 +388,22 @@ mod tests {
|
||||
.into()
|
||||
})
|
||||
.collect::<Vec<Arc<SignedBeaconBlock<E>>>>();
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(true, None, None);
|
||||
|
||||
let components_id = components_id();
|
||||
let blocks_req_id = blocks_id(components_id);
|
||||
let blobs_req_id = blobs_id(components_id);
|
||||
let mut info =
|
||||
RangeBlockComponentsRequest::<E>::new(blocks_req_id, Some(blobs_req_id), None);
|
||||
|
||||
// Send blocks and complete terminate response
|
||||
info.add_blocks(blocks);
|
||||
info.add_blocks(blocks_req_id, blocks).unwrap();
|
||||
// Expect no blobs returned
|
||||
info.add_blobs(vec![]);
|
||||
info.add_blobs(blobs_req_id, vec![]).unwrap();
|
||||
|
||||
// Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned.
|
||||
// This makes sure we don't expect blobs here when they have expired. Checking this logic should
|
||||
// be hendled elsewhere.
|
||||
assert!(info.is_finished());
|
||||
info.into_responses(&test_spec::<E>()).unwrap();
|
||||
info.responses(&test_spec::<E>()).unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -304,40 +421,49 @@ mod tests {
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let components_id = components_id();
|
||||
let blocks_req_id = blocks_id(components_id);
|
||||
let columns_req_id = expects_custody_columns
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, _)| columns_id(i as Id, components_id))
|
||||
.collect::<Vec<_>>();
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(
|
||||
false,
|
||||
Some(expects_custody_columns.clone()),
|
||||
Some(expects_custody_columns.len()),
|
||||
blocks_req_id,
|
||||
None,
|
||||
Some((columns_req_id.clone(), expects_custody_columns.clone())),
|
||||
);
|
||||
// Send blocks and complete terminate response
|
||||
info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect());
|
||||
info.add_blocks(
|
||||
blocks_req_id,
|
||||
blocks.iter().map(|b| b.0.clone().into()).collect(),
|
||||
)
|
||||
.unwrap();
|
||||
// Assert response is not finished
|
||||
assert!(!info.is_finished());
|
||||
assert!(!is_finished(&info));
|
||||
|
||||
// Send data columns
|
||||
for (i, &column_index) in expects_custody_columns.iter().enumerate() {
|
||||
info.add_custody_columns(
|
||||
columns_req_id.get(i).copied().unwrap(),
|
||||
blocks
|
||||
.iter()
|
||||
.flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned())
|
||||
.collect(),
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
if i < expects_custody_columns.len() - 1 {
|
||||
assert!(
|
||||
!info.is_finished(),
|
||||
!is_finished(&info),
|
||||
"requested should not be finished at loop {i}"
|
||||
);
|
||||
} else {
|
||||
assert!(
|
||||
info.is_finished(),
|
||||
"request should be finishied at loop {i}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// All completed construct response
|
||||
info.into_responses(&spec).unwrap();
|
||||
info.responses(&spec).unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -353,10 +479,18 @@ mod tests {
|
||||
(0..batched_column_requests.len() as u32).collect::<Vec<_>>();
|
||||
let num_of_data_column_requests = custody_column_request_ids.len();
|
||||
|
||||
let components_id = components_id();
|
||||
let blocks_req_id = blocks_id(components_id);
|
||||
let columns_req_id = batched_column_requests
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, _)| columns_id(i as Id, components_id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(
|
||||
false,
|
||||
Some(expects_custody_columns.clone()),
|
||||
Some(num_of_data_column_requests),
|
||||
blocks_req_id,
|
||||
None,
|
||||
Some((columns_req_id.clone(), expects_custody_columns.clone())),
|
||||
);
|
||||
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
@@ -372,13 +506,18 @@ mod tests {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Send blocks and complete terminate response
|
||||
info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect());
|
||||
info.add_blocks(
|
||||
blocks_req_id,
|
||||
blocks.iter().map(|b| b.0.clone().into()).collect(),
|
||||
)
|
||||
.unwrap();
|
||||
// Assert response is not finished
|
||||
assert!(!info.is_finished());
|
||||
assert!(!is_finished(&info));
|
||||
|
||||
for (i, column_indices) in batched_column_requests.iter().enumerate() {
|
||||
// Send the set of columns in the same batch request
|
||||
info.add_custody_columns(
|
||||
columns_req_id.get(i).copied().unwrap(),
|
||||
blocks
|
||||
.iter()
|
||||
.flat_map(|b| {
|
||||
@@ -387,19 +526,18 @@ mod tests {
|
||||
.cloned()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
if i < num_of_data_column_requests - 1 {
|
||||
assert!(
|
||||
!info.is_finished(),
|
||||
!is_finished(&info),
|
||||
"requested should not be finished at loop {i}"
|
||||
);
|
||||
} else {
|
||||
assert!(info.is_finished(), "request should be finished at loop {i}");
|
||||
}
|
||||
}
|
||||
|
||||
// All completed construct response
|
||||
info.into_responses(&spec).unwrap();
|
||||
info.responses(&spec).unwrap().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1167,7 +1167,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
self.on_range_components_response(
|
||||
id.parent_request_id,
|
||||
peer_id,
|
||||
RangeBlockComponent::Block(resp),
|
||||
RangeBlockComponent::Block(id, resp),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1182,7 +1182,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
self.on_range_components_response(
|
||||
id.parent_request_id,
|
||||
peer_id,
|
||||
RangeBlockComponent::Blob(resp),
|
||||
RangeBlockComponent::Blob(id, resp),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1200,7 +1200,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
self.on_range_components_response(
|
||||
id.parent_request_id,
|
||||
peer_id,
|
||||
RangeBlockComponent::CustodyColumns(resp),
|
||||
RangeBlockComponent::CustodyColumns(id, resp),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,9 +218,18 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
||||
|
||||
/// Small enumeration to make dealing with block and blob requests easier.
|
||||
pub enum RangeBlockComponent<E: EthSpec> {
|
||||
Block(RpcResponseResult<Vec<Arc<SignedBeaconBlock<E>>>>),
|
||||
Blob(RpcResponseResult<Vec<Arc<BlobSidecar<E>>>>),
|
||||
CustodyColumns(RpcResponseResult<Vec<Arc<DataColumnSidecar<E>>>>),
|
||||
Block(
|
||||
BlocksByRangeRequestId,
|
||||
RpcResponseResult<Vec<Arc<SignedBeaconBlock<E>>>>,
|
||||
),
|
||||
Blob(
|
||||
BlobsByRangeRequestId,
|
||||
RpcResponseResult<Vec<Arc<BlobSidecar<E>>>>,
|
||||
),
|
||||
CustodyColumns(
|
||||
DataColumnsByRangeRequestId,
|
||||
RpcResponseResult<Vec<Arc<DataColumnSidecar<E>>>>,
|
||||
),
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
@@ -386,7 +395,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
requester,
|
||||
};
|
||||
|
||||
let _blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?;
|
||||
// Compute custody column peers before sending the blocks_by_range request. If we don't have
|
||||
// enough peers, error here.
|
||||
let data_column_requests = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
|
||||
let column_indexes = self.network_globals().sampling_columns.clone();
|
||||
Some(self.make_columns_by_range_requests(request.clone(), &column_indexes)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?;
|
||||
|
||||
let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) {
|
||||
Some(self.send_blobs_by_range_request(
|
||||
@@ -401,35 +419,27 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
None
|
||||
};
|
||||
|
||||
let (expects_columns, data_column_requests) =
|
||||
if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
|
||||
let column_indexes = self.network_globals().sampling_columns.clone();
|
||||
let data_column_requests = self
|
||||
.make_columns_by_range_requests(request, &column_indexes)?
|
||||
.into_iter()
|
||||
.map(|(peer_id, columns_by_range_request)| {
|
||||
self.send_data_columns_by_range_request(
|
||||
peer_id,
|
||||
columns_by_range_request,
|
||||
id,
|
||||
)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let data_columns = if let Some(data_column_requests) = data_column_requests {
|
||||
let data_column_requests = data_column_requests
|
||||
.into_iter()
|
||||
.map(|(peer_id, columns_by_range_request)| {
|
||||
self.send_data_columns_by_range_request(peer_id, columns_by_range_request, id)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
(
|
||||
Some(column_indexes.into_iter().collect::<Vec<_>>()),
|
||||
Some(data_column_requests),
|
||||
)
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
Some((
|
||||
data_column_requests,
|
||||
self.network_globals()
|
||||
.sampling_columns
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>(),
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let expected_blobs = blobs_req_id.is_some();
|
||||
let info = RangeBlockComponentsRequest::new(
|
||||
expected_blobs,
|
||||
expects_columns,
|
||||
data_column_requests.map(|items| items.len()),
|
||||
);
|
||||
let info = RangeBlockComponentsRequest::new(blocks_req_id, blobs_req_id, data_columns);
|
||||
self.components_by_range_requests.insert(id, info);
|
||||
|
||||
Ok(id.id)
|
||||
@@ -484,28 +494,33 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
if let Err(e) = {
|
||||
let request = entry.get_mut();
|
||||
match range_block_component {
|
||||
RangeBlockComponent::Block(resp) => resp.map(|(blocks, _)| {
|
||||
request.add_blocks(blocks);
|
||||
RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| {
|
||||
request
|
||||
.add_blocks(req_id, blocks)
|
||||
.map_err(RpcResponseError::BlockComponentCouplingError)
|
||||
}),
|
||||
RangeBlockComponent::Blob(resp) => resp.map(|(blobs, _)| {
|
||||
request.add_blobs(blobs);
|
||||
}),
|
||||
RangeBlockComponent::CustodyColumns(resp) => resp.map(|(custody_columns, _)| {
|
||||
request.add_custody_columns(custody_columns);
|
||||
RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| {
|
||||
request
|
||||
.add_blobs(req_id, blobs)
|
||||
.map_err(RpcResponseError::BlockComponentCouplingError)
|
||||
}),
|
||||
RangeBlockComponent::CustodyColumns(req_id, resp) => {
|
||||
resp.and_then(|(custody_columns, _)| {
|
||||
request
|
||||
.add_custody_columns(req_id, custody_columns)
|
||||
.map_err(RpcResponseError::BlockComponentCouplingError)
|
||||
})
|
||||
}
|
||||
}
|
||||
} {
|
||||
entry.remove();
|
||||
return Some(Err(e));
|
||||
}
|
||||
|
||||
if entry.get_mut().is_finished() {
|
||||
if let Some(blocks_result) = entry.get().responses(&self.chain.spec) {
|
||||
entry.remove();
|
||||
// If the request is finished, dequeue everything
|
||||
let request = entry.remove();
|
||||
let blocks = request
|
||||
.into_responses(&self.chain.spec)
|
||||
.map_err(RpcResponseError::BlockComponentCouplingError);
|
||||
Some(blocks)
|
||||
Some(blocks_result.map_err(RpcResponseError::BlockComponentCouplingError))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user