First pass

This commit is contained in:
Pawan Dhananjay
2024-08-29 16:11:19 -07:00
parent 653126f42e
commit 25feedfde3
29 changed files with 262 additions and 130 deletions

View File

@@ -8,7 +8,8 @@ use std::{
sync::Arc,
};
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, RuntimeVariableList,
SignedBeaconBlock,
};
#[derive(Debug)]
@@ -31,6 +32,7 @@ pub struct RangeBlockComponentsRequest<E: EthSpec> {
num_custody_column_requests: Option<usize>,
/// The peers the request was made to.
pub(crate) peer_ids: Vec<PeerId>,
max_blobs_per_block: usize,
}
impl<E: EthSpec> RangeBlockComponentsRequest<E> {
@@ -39,6 +41,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
expects_custody_columns: Option<Vec<ColumnIndex>>,
num_custody_column_requests: Option<usize>,
peer_ids: Vec<PeerId>,
max_blobs_per_block: usize,
) -> Self {
Self {
blocks: <_>::default(),
@@ -51,6 +54,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
expects_custody_columns,
num_custody_column_requests,
peer_ids,
max_blobs_per_block,
}
}
@@ -100,7 +104,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
let mut responses = Vec::with_capacity(blocks.len());
let mut blob_iter = blobs.into_iter().peekable();
for block in blocks.into_iter() {
let mut blob_list = Vec::with_capacity(E::max_blobs_per_block());
let mut blob_list = Vec::with_capacity(self.max_blobs_per_block);
while {
let pair_next_blob = blob_iter
.peek()
@@ -111,7 +115,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?);
}
let mut blobs_buffer = vec![None; E::max_blobs_per_block()];
let mut blobs_buffer = vec![None; self.max_blobs_per_block];
for blob in blob_list {
let blob_index = blob.index as usize;
let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else {
@@ -123,7 +127,11 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
*blob_opt = Some(blob);
}
}
let blobs = VariableList::from(blobs_buffer.into_iter().flatten().collect::<Vec<_>>());
let blobs = RuntimeVariableList::new(
blobs_buffer.into_iter().flatten().collect::<Vec<_>>(),
self.max_blobs_per_block,
)
.map_err(|_| "Blobs returned exceeds max length".to_string())?;
responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?)
}

View File

@@ -1120,6 +1120,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.network
.range_block_and_blob_response(id, block_or_blob)
{
let epoch = resp.sender_id.batch_id();
match resp.responses {
Ok(blocks) => {
match resp.sender_id {
@@ -1163,6 +1164,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
resp.expects_custody_columns,
None,
vec![],
self.chain.spec.max_blobs_per_block(epoch) as usize,
),
);
// inform range that the request needs to be treated as failed

View File

@@ -36,8 +36,8 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256,
SignedBeaconBlock, Slot,
chain_spec, BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList,
EthSpec, Hash256, SignedBeaconBlock, Slot,
};
pub mod custody;
@@ -61,6 +61,15 @@ pub enum RangeRequestId {
},
}
impl RangeRequestId {
pub fn batch_id(&self) -> BatchId {
match self {
RangeRequestId::RangeSync { batch_id, .. } => *batch_id,
RangeRequestId::BackfillSync { batch_id, .. } => *batch_id,
}
}
}
#[derive(Debug)]
pub enum RpcEvent<T> {
StreamTermination,
@@ -422,11 +431,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
(None, None)
};
// TODO(pawan): this would break if a batch contains multiple epochs
let max_blobs_len = self.chain.spec.max_blobs_per_block(epoch);
let info = RangeBlockComponentsRequest::new(
expected_blobs,
expects_custody_columns,
num_of_custody_column_req,
requested_peers,
max_blobs_len as usize,
);
self.range_block_components_requests
.insert(id, (sender_id, info));
@@ -977,9 +989,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
RpcEvent::Response(blob, seen_timestamp) => {
let request = request.get_mut();
match request.add_response(blob) {
Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs)
.map(|blobs| (blobs, seen_timestamp))
.map_err(|e| (e.into(), request.resolve())),
Ok(Some(blobs)) => {
let max_len = if let Some(blob) = blobs.first() {
self.chain.spec.max_blobs_per_block(blob.epoch()) as usize
} else {
6
};
to_fixed_blob_sidecar_list(blobs, max_len)
.map(|blobs| (blobs, seen_timestamp))
.map_err(|e| (e.into(), request.resolve()))
}
Ok(None) => return None,
Err(e) => Err((e.into(), request.resolve())),
}
@@ -1218,8 +1237,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
fn to_fixed_blob_sidecar_list<E: EthSpec>(
blobs: Vec<Arc<BlobSidecar<E>>>,
max_len: usize,
) -> Result<FixedBlobSidecarList<E>, LookupVerifyError> {
let mut fixed_list = FixedBlobSidecarList::default();
// TODO(pawan): have a method on fixed runtime vector that just initializes a raw vec with max_len = None
// to signify an empty fixed runtime vector
let mut fixed_list = FixedBlobSidecarList::new(vec![None; max_len]);
for blob in blobs.into_iter() {
let index = blob.index as usize;
*fixed_list