Fix rebase conflicts

This commit is contained in:
Emilia Hane
2023-01-26 20:18:59 +01:00
parent 69c30bb6eb
commit 09370e70d9
21 changed files with 196 additions and 128 deletions

View File

@@ -726,7 +726,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let verification_result = self
.chain
.clone()
.verify_block_for_gossip(block.clone().into())
.verify_block_for_gossip(block.clone())
.await;
let block_root = if let Ok(verified_block) = &verification_result {

View File

@@ -282,10 +282,7 @@ impl<T: BeaconChainTypes> Worker<T> {
count_unrealized: CountUnrealized,
notify_execution_layer: NotifyExecutionLayer,
) -> (usize, Result<(), ChainSegmentFailed>) {
let blocks: Vec<_> = downloaded_blocks
.cloned()
.map(|block| block.into())
.collect();
let blocks: Vec<_> = downloaded_blocks.cloned().collect();
match self
.chain
.process_chain_segment(blocks, count_unrealized, notify_execution_layer)

View File

@@ -803,15 +803,15 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
block_or_blob: BlockOrBlobs<T::EthSpec>,
) {
if let Some((chain_id, batch_id, block_responses)) = self
if let Some((chain_id, resp)) = self
.network
.range_sync_block_and_blob_response(id, block_or_blob)
{
match block_responses {
match resp.responses {
Ok(blocks) => {
for block in blocks
.into_iter()
.map(|block| Some(block))
.map(Some)
// chain the stream terminator
.chain(vec![None])
{
@@ -819,7 +819,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
&mut self.network,
peer_id,
chain_id,
batch_id,
resp.batch_id,
id,
block,
);
@@ -831,7 +831,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// With time we will want to downgrade this log
warn!(
self.log, "Blocks and blobs request for range received invalid data";
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
"peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e
);
// TODO: penalize the peer for being a bad boy
let id = RequestId::RangeBlobs { id };
@@ -849,21 +849,21 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
block_or_blob: BlockOrBlobs<T::EthSpec>,
) {
if let Some((batch_id, block_responses)) = self
if let Some(resp) = self
.network
.backfill_sync_block_and_blob_response(id, block_or_blob)
{
match block_responses {
match resp.responses {
Ok(blocks) => {
for block in blocks
.into_iter()
.map(|block| Some(block))
.map(Some)
// chain the stream terminator
.chain(vec![None])
{
match self.backfill_sync.on_block_response(
&mut self.network,
batch_id,
resp.batch_id,
&peer_id,
id,
block,
@@ -883,7 +883,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// With time we will want to downgrade this log
warn!(
self.log, "Blocks and blobs request for backfill received invalid data";
"peer_id" => %peer_id, "batch_id" => batch_id, "error" => e
"peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e
);
// TODO: penalize the peer for being a bad boy
let id = RequestId::BackFillBlobs { id };

View File

@@ -20,6 +20,17 @@ use std::sync::Arc;
use tokio::sync::mpsc;
use types::{BlobsSidecar, EthSpec, SignedBeaconBlock};
pub struct BlocksAndBlobsByRangeResponse<T: EthSpec> {
pub batch_id: BatchId,
pub responses: Result<Vec<BlockWrapper<T>>, &'static str>,
}
pub struct BlocksAndBlobsByRangeRequest<T: EthSpec> {
pub chain_id: ChainId,
pub batch_id: BatchId,
pub block_blob_info: BlocksAndBlobsRequestInfo<T>,
}
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// The network channel to relay messages to the Network service.
@@ -38,8 +49,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
backfill_requests: FnvHashMap<Id, BatchId>,
/// BlocksByRange requests paired with BlobsByRange requests made by the range.
range_blocks_and_blobs_requests:
FnvHashMap<Id, (ChainId, BatchId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
range_blocks_and_blobs_requests: FnvHashMap<Id, BlocksAndBlobsByRangeRequest<T::EthSpec>>,
/// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync.
backfill_blocks_and_blobs_requests:
@@ -198,8 +208,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id,
})?;
let block_blob_info = BlocksAndBlobsRequestInfo::default();
self.range_blocks_and_blobs_requests
.insert(id, (chain_id, batch_id, block_blob_info));
self.range_blocks_and_blobs_requests.insert(
id,
BlocksAndBlobsByRangeRequest {
chain_id,
batch_id,
block_blob_info,
},
);
Ok(id)
}
}
@@ -290,22 +306,30 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
request_id: Id,
block_or_blob: BlockOrBlobs<T::EthSpec>,
) -> Option<(
ChainId,
BatchId,
Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>,
)> {
) -> Option<(ChainId, BlocksAndBlobsByRangeResponse<T::EthSpec>)> {
match self.range_blocks_and_blobs_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (_, _, info) = entry.get_mut();
let req = entry.get_mut();
let info = &mut req.block_blob_info;
match block_or_blob {
BlockOrBlobs::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlobs::Blobs(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
let (chain_id, batch_id, info) = entry.remove();
Some((chain_id, batch_id, info.into_responses()))
let BlocksAndBlobsByRangeRequest {
chain_id,
batch_id,
block_blob_info,
} = entry.remove();
Some((
chain_id,
BlocksAndBlobsByRangeResponse {
batch_id,
responses: block_blob_info.into_responses(),
},
))
} else {
None
}
@@ -323,7 +347,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
ByRangeRequestType::BlocksAndBlobs => self
.range_blocks_and_blobs_requests
.remove(&request_id)
.map(|(chain_id, batch_id, _info)| (chain_id, batch_id)),
.map(|req| (req.chain_id, req.batch_id)),
ByRangeRequestType::Blocks => self.range_requests.remove(&request_id),
}
}
@@ -349,20 +373,19 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
is_stream_terminator: bool,
) -> Option<BatchId> {
if is_stream_terminator {
self.backfill_requests
.remove(&request_id)
.map(|batch_id| batch_id)
self.backfill_requests.remove(&request_id)
} else {
self.backfill_requests.get(&request_id).copied()
}
}
/// Received a blocks by range response for a request that couples blocks and blobs.
/// Received a blocks by range or blobs by range response for a request that couples blocks '
/// and blobs.
pub fn backfill_sync_block_and_blob_response(
&mut self,
request_id: Id,
block_or_blob: BlockOrBlobs<T::EthSpec>,
) -> Option<(BatchId, Result<Vec<BlockWrapper<T::EthSpec>>, &'static str>)> {
) -> Option<BlocksAndBlobsByRangeResponse<T::EthSpec>> {
match self.backfill_blocks_and_blobs_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (_, info) = entry.get_mut();
@@ -373,7 +396,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
if info.is_finished() {
// If the request is finished, dequeue everything
let (batch_id, info) = entry.remove();
Some((batch_id, info.into_responses()))
Some(BlocksAndBlobsByRangeResponse {
batch_id,
responses: info.into_responses(),
})
} else {
None
}
@@ -535,15 +561,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
/// Check whether a batch for this epoch (and only this epoch) should request just blocks or
/// blocks and blobs.
pub fn batch_type(&self, _epoch: types::Epoch) -> ByRangeRequestType {
const _: () = assert!(
super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1
&& super::range_sync::EPOCHS_PER_BATCH == 1,
"To deal with alignment with 4844 boundaries, batches need to be of just one epoch"
);
if super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH * super::range_sync::EPOCHS_PER_BATCH
!= 1
{
panic!(
"To deal with alignment with 4844 boundaries, batches need to be of just one epoch"
);
}
#[cfg(test)]
{
// Keep tests only for blocks.
return ByRangeRequestType::Blocks;
ByRangeRequestType::Blocks
}
#[cfg(not(test))]
{

View File

@@ -615,6 +615,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
///
/// If a previous batch has been validated and it had been re-processed, penalize the original
/// peer.
#[allow(clippy::modulo_one)]
fn advance_chain(&mut self, network: &mut SyncNetworkContext<T>, validating_epoch: Epoch) {
// make sure this epoch produces an advancement
if validating_epoch <= self.start_epoch {