mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-29 19:04:27 +00:00
resolve merge conflicts
This commit is contained in:
@@ -259,7 +259,7 @@ impl TestRig {
|
||||
assert!(beacon_processor.is_ok());
|
||||
let block = next_block_tuple.0;
|
||||
let blob_sidecars = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
|
||||
Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs).unwrap())
|
||||
Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -344,7 +344,7 @@ impl TestRig {
|
||||
}
|
||||
pub fn enqueue_single_lookup_rpc_blobs(&self) {
|
||||
if let Some(blobs) = self.next_blobs.clone() {
|
||||
let blobs = FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::<Vec<_>>());
|
||||
let blobs = FixedBlobSidecarList::new(blobs.into_iter().map(Some).collect::<Vec<_>>());
|
||||
self.network_beacon_processor
|
||||
.send_rpc_blobs(
|
||||
self.next_block.canonical_root(),
|
||||
@@ -1130,7 +1130,12 @@ async fn test_blobs_by_range() {
|
||||
.block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None)
|
||||
.unwrap();
|
||||
blob_count += root
|
||||
.map(|root| rig.chain.get_blobs(&root).unwrap_or_default().len())
|
||||
.map(|root| {
|
||||
rig.chain
|
||||
.get_blobs(&root)
|
||||
.map(|list| list.len())
|
||||
.unwrap_or(0)
|
||||
})
|
||||
.unwrap_or(0);
|
||||
}
|
||||
let mut actual_count = 0;
|
||||
|
||||
@@ -2,13 +2,13 @@ use beacon_chain::{
|
||||
block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root,
|
||||
};
|
||||
use lighthouse_network::PeerId;
|
||||
use ssz_types::VariableList;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
sync::Arc,
|
||||
};
|
||||
use types::{
|
||||
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock,
|
||||
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, RuntimeVariableList,
|
||||
SignedBeaconBlock,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -31,6 +31,7 @@ pub struct RangeBlockComponentsRequest<E: EthSpec> {
|
||||
num_custody_column_requests: Option<usize>,
|
||||
/// The peers the request was made to.
|
||||
pub(crate) peer_ids: Vec<PeerId>,
|
||||
max_blobs_per_block: usize,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
@@ -39,6 +40,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
expects_custody_columns: Option<Vec<ColumnIndex>>,
|
||||
num_custody_column_requests: Option<usize>,
|
||||
peer_ids: Vec<PeerId>,
|
||||
max_blobs_per_block: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
blocks: <_>::default(),
|
||||
@@ -51,6 +53,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
expects_custody_columns,
|
||||
num_custody_column_requests,
|
||||
peer_ids,
|
||||
max_blobs_per_block,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,7 +103,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
let mut responses = Vec::with_capacity(blocks.len());
|
||||
let mut blob_iter = blobs.into_iter().peekable();
|
||||
for block in blocks.into_iter() {
|
||||
let mut blob_list = Vec::with_capacity(E::max_blobs_per_block());
|
||||
let mut blob_list = Vec::with_capacity(self.max_blobs_per_block);
|
||||
while {
|
||||
let pair_next_blob = blob_iter
|
||||
.peek()
|
||||
@@ -111,7 +114,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?);
|
||||
}
|
||||
|
||||
let mut blobs_buffer = vec![None; E::max_blobs_per_block()];
|
||||
let mut blobs_buffer = vec![None; self.max_blobs_per_block];
|
||||
for blob in blob_list {
|
||||
let blob_index = blob.index as usize;
|
||||
let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else {
|
||||
@@ -123,7 +126,11 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
|
||||
*blob_opt = Some(blob);
|
||||
}
|
||||
}
|
||||
let blobs = VariableList::from(blobs_buffer.into_iter().flatten().collect::<Vec<_>>());
|
||||
let blobs = RuntimeVariableList::new(
|
||||
blobs_buffer.into_iter().flatten().collect::<Vec<_>>(),
|
||||
self.max_blobs_per_block,
|
||||
)
|
||||
.map_err(|_| "Blobs returned exceeds max length".to_string())?;
|
||||
responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?)
|
||||
}
|
||||
|
||||
@@ -245,12 +252,18 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn no_blobs_into_responses() {
|
||||
let spec = test_spec::<E>();
|
||||
let peer_id = PeerId::random();
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(false, None, None, vec![peer_id]);
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
let blocks = (0..4)
|
||||
.map(|_| generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng).0)
|
||||
.map(|_| {
|
||||
generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng, &spec)
|
||||
.0
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let max_len = spec.max_blobs_per_block(blocks.first().unwrap().epoch()) as usize;
|
||||
let mut info =
|
||||
RangeBlockComponentsRequest::<E>::new(false, None, None, vec![peer_id], max_len);
|
||||
|
||||
// Send blocks and complete terminate response
|
||||
for block in blocks {
|
||||
@@ -265,15 +278,24 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn empty_blobs_into_responses() {
|
||||
let spec = test_spec::<E>();
|
||||
let peer_id = PeerId::random();
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(true, None, None, vec![peer_id]);
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
let blocks = (0..4)
|
||||
.map(|_| {
|
||||
// Always generate some blobs.
|
||||
generate_rand_block_and_blobs::<E>(ForkName::Deneb, NumBlobs::Number(3), &mut rng).0
|
||||
generate_rand_block_and_blobs::<E>(
|
||||
ForkName::Deneb,
|
||||
NumBlobs::Number(3),
|
||||
&mut rng,
|
||||
&spec,
|
||||
)
|
||||
.0
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let max_len = spec.max_blobs_per_block(blocks.first().unwrap().epoch()) as usize;
|
||||
let mut info =
|
||||
RangeBlockComponentsRequest::<E>::new(true, None, None, vec![peer_id], max_len);
|
||||
|
||||
// Send blocks and complete terminate response
|
||||
for block in blocks {
|
||||
@@ -294,12 +316,7 @@ mod tests {
|
||||
fn rpc_block_with_custody_columns() {
|
||||
let spec = test_spec::<E>();
|
||||
let expects_custody_columns = vec![1, 2, 3, 4];
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(
|
||||
false,
|
||||
Some(expects_custody_columns.clone()),
|
||||
Some(expects_custody_columns.len()),
|
||||
vec![PeerId::random()],
|
||||
);
|
||||
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
let blocks = (0..4)
|
||||
.map(|_| {
|
||||
@@ -311,7 +328,14 @@ mod tests {
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let max_len = spec.max_blobs_per_block(blocks.first().unwrap().0.epoch()) as usize;
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(
|
||||
false,
|
||||
Some(expects_custody_columns.clone()),
|
||||
Some(expects_custody_columns.len()),
|
||||
vec![PeerId::random()],
|
||||
max_len,
|
||||
);
|
||||
// Send blocks and complete terminate response
|
||||
for block in &blocks {
|
||||
info.add_block_response(Some(block.0.clone().into()));
|
||||
@@ -355,12 +379,7 @@ mod tests {
|
||||
let spec = test_spec::<E>();
|
||||
let expects_custody_columns = vec![1, 2, 3, 4];
|
||||
let num_of_data_column_requests = 2;
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(
|
||||
false,
|
||||
Some(expects_custody_columns.clone()),
|
||||
Some(num_of_data_column_requests),
|
||||
vec![PeerId::random()],
|
||||
);
|
||||
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
let blocks = (0..4)
|
||||
.map(|_| {
|
||||
@@ -372,7 +391,14 @@ mod tests {
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let max_len = spec.max_blobs_per_block(blocks.first().unwrap().0.epoch()) as usize;
|
||||
let mut info = RangeBlockComponentsRequest::<E>::new(
|
||||
false,
|
||||
Some(expects_custody_columns.clone()),
|
||||
Some(num_of_data_column_requests),
|
||||
vec![PeerId::random()],
|
||||
max_len,
|
||||
);
|
||||
// Send blocks and complete terminate response
|
||||
for block in &blocks {
|
||||
info.add_block_response(Some(block.0.clone().into()));
|
||||
|
||||
@@ -1234,6 +1234,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
.network
|
||||
.range_block_and_blob_response(id, block_or_blob)
|
||||
{
|
||||
let epoch = resp.sender_id.batch_id();
|
||||
match resp.responses {
|
||||
Ok(blocks) => {
|
||||
match resp.sender_id {
|
||||
@@ -1277,6 +1278,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
resp.expects_custody_columns,
|
||||
None,
|
||||
vec![],
|
||||
self.chain.spec.max_blobs_per_block(epoch) as usize,
|
||||
),
|
||||
);
|
||||
// inform range that the request needs to be treated as failed
|
||||
|
||||
@@ -67,6 +67,15 @@ pub enum RangeRequestId {
|
||||
},
|
||||
}
|
||||
|
||||
impl RangeRequestId {
|
||||
pub fn batch_id(&self) -> BatchId {
|
||||
match self {
|
||||
RangeRequestId::RangeSync { batch_id, .. } => *batch_id,
|
||||
RangeRequestId::BackfillSync { batch_id, .. } => *batch_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RpcEvent<T> {
|
||||
StreamTermination,
|
||||
@@ -445,11 +454,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
// TODO(pawan): this would break if a batch contains multiple epochs
|
||||
let max_blobs_len = self.chain.spec.max_blobs_per_block(epoch);
|
||||
let info = RangeBlockComponentsRequest::new(
|
||||
expected_blobs,
|
||||
expects_columns,
|
||||
num_of_column_req,
|
||||
requested_peers,
|
||||
max_blobs_len as usize,
|
||||
);
|
||||
self.range_block_components_requests
|
||||
.insert(id, (sender_id, info));
|
||||
@@ -950,12 +962,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
|
||||
let response = self.blobs_by_root_requests.on_response(id, rpc_event);
|
||||
let response = response.map(|res| {
|
||||
res.and_then(
|
||||
|(blobs, seen_timestamp)| match to_fixed_blob_sidecar_list(blobs) {
|
||||
res.and_then(|(blobs, seen_timestamp)| {
|
||||
let max_len = if let Some(blob) = blobs.first() {
|
||||
self.chain.spec.max_blobs_per_block(blob.epoch()) as usize
|
||||
} else {
|
||||
6
|
||||
};
|
||||
match to_fixed_blob_sidecar_list(blobs, max_len) {
|
||||
Ok(blobs) => Ok((blobs, seen_timestamp)),
|
||||
Err(e) => Err(e.into()),
|
||||
},
|
||||
)
|
||||
}
|
||||
})
|
||||
});
|
||||
if let Some(Err(RpcResponseError::VerifyError(e))) = &response {
|
||||
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
|
||||
@@ -1150,8 +1167,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
|
||||
fn to_fixed_blob_sidecar_list<E: EthSpec>(
|
||||
blobs: Vec<Arc<BlobSidecar<E>>>,
|
||||
max_len: usize,
|
||||
) -> Result<FixedBlobSidecarList<E>, LookupVerifyError> {
|
||||
let mut fixed_list = FixedBlobSidecarList::default();
|
||||
let mut fixed_list = FixedBlobSidecarList::new(vec![None; max_len]);
|
||||
for blob in blobs.into_iter() {
|
||||
let index = blob.index as usize;
|
||||
*fixed_list
|
||||
|
||||
@@ -119,6 +119,8 @@ impl TestRig {
|
||||
.network_globals
|
||||
.set_sync_state(SyncState::Synced);
|
||||
|
||||
let spec = chain.spec.clone();
|
||||
|
||||
let rng = XorShiftRng::from_seed([42; 16]);
|
||||
TestRig {
|
||||
beacon_processor_rx,
|
||||
@@ -142,6 +144,7 @@ impl TestRig {
|
||||
harness,
|
||||
fork_name,
|
||||
log,
|
||||
spec,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -213,7 +216,7 @@ impl TestRig {
|
||||
) -> (SignedBeaconBlock<E>, Vec<BlobSidecar<E>>) {
|
||||
let fork_name = self.fork_name;
|
||||
let rng = &mut self.rng;
|
||||
generate_rand_block_and_blobs::<E>(fork_name, num_blobs, rng)
|
||||
generate_rand_block_and_blobs::<E>(fork_name, num_blobs, rng, &self.spec)
|
||||
}
|
||||
|
||||
fn rand_block_and_data_columns(
|
||||
@@ -1328,8 +1331,10 @@ impl TestRig {
|
||||
|
||||
#[test]
|
||||
fn stable_rng() {
|
||||
let spec = types::MainnetEthSpec::default_spec();
|
||||
let mut rng = XorShiftRng::from_seed([42; 16]);
|
||||
let (block, _) = generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng);
|
||||
let (block, _) =
|
||||
generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng, &spec);
|
||||
assert_eq!(
|
||||
block.canonical_root(),
|
||||
Hash256::from_slice(
|
||||
@@ -2187,8 +2192,8 @@ mod deneb_only {
|
||||
block_verification_types::{AsBlock, RpcBlock},
|
||||
data_availability_checker::AvailabilityCheckError,
|
||||
};
|
||||
use ssz_types::VariableList;
|
||||
use std::collections::VecDeque;
|
||||
use types::RuntimeVariableList;
|
||||
|
||||
struct DenebTester {
|
||||
rig: TestRig,
|
||||
@@ -2546,12 +2551,15 @@ mod deneb_only {
|
||||
fn parent_block_unknown_parent(mut self) -> Self {
|
||||
self.rig.log("parent_block_unknown_parent");
|
||||
let block = self.unknown_parent_block.take().unwrap();
|
||||
let max_len = self.rig.spec.max_blobs_per_block(block.epoch()) as usize;
|
||||
// Now this block is the one we expect requests from
|
||||
self.block = block.clone();
|
||||
let block = RpcBlock::new(
|
||||
Some(block.canonical_root()),
|
||||
block,
|
||||
self.unknown_parent_blobs.take().map(VariableList::from),
|
||||
self.unknown_parent_blobs
|
||||
.take()
|
||||
.map(|vec| RuntimeVariableList::from_vec(vec, max_len)),
|
||||
)
|
||||
.unwrap();
|
||||
self.rig.parent_block_processed(
|
||||
|
||||
@@ -12,7 +12,7 @@ use slot_clock::ManualSlotClock;
|
||||
use std::sync::Arc;
|
||||
use store::MemoryStore;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E};
|
||||
use types::{test_utils::XorShiftRng, ChainSpec, ForkName, MinimalEthSpec as E};
|
||||
|
||||
mod lookups;
|
||||
mod range;
|
||||
@@ -64,4 +64,5 @@ struct TestRig {
|
||||
rng: XorShiftRng,
|
||||
fork_name: ForkName,
|
||||
log: Logger,
|
||||
spec: Arc<ChainSpec>,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user