mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 18:04:18 +00:00
Refactor deneb block processing (#4511)
* Revert "fix merge"
This reverts commit 405e95b0ce.
* refactor deneb block processing
* cargo fmt
* fix ci
This commit is contained in:
@@ -5,8 +5,8 @@ use crate::{
|
||||
sync::SyncMessage,
|
||||
};
|
||||
|
||||
use beacon_chain::blob_verification::AsBlock;
|
||||
use beacon_chain::blob_verification::{BlobError, GossipVerifiedBlob};
|
||||
use beacon_chain::block_verification_types::AsBlock;
|
||||
use beacon_chain::store::Error;
|
||||
use beacon_chain::{
|
||||
attestation_verification::{self, Error as AttnError, VerifiedAttestation},
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::{
|
||||
service::NetworkMessage,
|
||||
sync::{manager::BlockProcessType, SyncMessage},
|
||||
};
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::{
|
||||
builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain,
|
||||
};
|
||||
@@ -409,7 +409,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_rpc_beacon_block(
|
||||
self: &Arc<Self>,
|
||||
block_root: Hash256,
|
||||
block: BlockWrapper<T::EthSpec>,
|
||||
block: RpcBlock<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
@@ -450,7 +450,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn send_chain_segment(
|
||||
self: &Arc<Self>,
|
||||
process_id: ChainSegmentProcessId,
|
||||
blocks: Vec<BlockWrapper<T::EthSpec>>,
|
||||
blocks: Vec<RpcBlock<T::EthSpec>>,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
|
||||
let processor = self.clone();
|
||||
|
||||
@@ -6,8 +6,9 @@ use crate::sync::{
|
||||
manager::{BlockProcessType, SyncMessage},
|
||||
ChainId,
|
||||
};
|
||||
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, MaybeAvailableBlock};
|
||||
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
|
||||
use beacon_chain::data_availability_checker::AvailabilityCheckError;
|
||||
use beacon_chain::data_availability_checker::MaybeAvailableBlock;
|
||||
use beacon_chain::{
|
||||
observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms,
|
||||
AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError,
|
||||
@@ -54,7 +55,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn generate_rpc_beacon_block_process_fn(
|
||||
self: Arc<Self>,
|
||||
block_root: Hash256,
|
||||
block: BlockWrapper<T::EthSpec>,
|
||||
block: RpcBlock<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> AsyncFn {
|
||||
@@ -78,7 +79,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub fn generate_rpc_beacon_block_fns(
|
||||
self: Arc<Self>,
|
||||
block_root: Hash256,
|
||||
block: BlockWrapper<T::EthSpec>,
|
||||
block: RpcBlock<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> (AsyncFn, BlockingFn) {
|
||||
@@ -106,7 +107,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub async fn process_rpc_block(
|
||||
self: Arc<NetworkBeaconProcessor<T>>,
|
||||
block_root: Hash256,
|
||||
block: BlockWrapper<T::EthSpec>,
|
||||
block: RpcBlock<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
|
||||
@@ -315,7 +316,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
pub async fn process_chain_segment(
|
||||
&self,
|
||||
sync_type: ChainSegmentProcessId,
|
||||
downloaded_blocks: Vec<BlockWrapper<T::EthSpec>>,
|
||||
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
) {
|
||||
let result = match sync_type {
|
||||
@@ -440,7 +441,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
|
||||
async fn process_blocks<'a>(
|
||||
&self,
|
||||
downloaded_blocks: impl Iterator<Item = &'a BlockWrapper<T::EthSpec>>,
|
||||
downloaded_blocks: impl Iterator<Item = &'a RpcBlock<T::EthSpec>>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
) -> (usize, Result<(), ChainSegmentFailed>) {
|
||||
let blocks: Vec<_> = downloaded_blocks.cloned().collect();
|
||||
@@ -473,7 +474,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
/// Helper function to process backfill block batches which only consumes the chain and blocks to process.
|
||||
fn process_backfill_blocks(
|
||||
&self,
|
||||
downloaded_blocks: Vec<BlockWrapper<T::EthSpec>>,
|
||||
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,
|
||||
) -> (usize, Result<(), ChainSegmentFailed>) {
|
||||
let total_blocks = downloaded_blocks.len();
|
||||
let available_blocks = match downloaded_blocks
|
||||
@@ -481,7 +482,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
.map(|block| {
|
||||
self.chain
|
||||
.data_availability_checker
|
||||
.check_availability(block)
|
||||
.check_rpc_block_availability(block)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
{
|
||||
|
||||
@@ -14,7 +14,7 @@ use crate::sync::network_context::SyncNetworkContext;
|
||||
use crate::sync::range_sync::{
|
||||
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
|
||||
};
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use lighthouse_network::types::{BackFillState, NetworkGlobals};
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
@@ -55,7 +55,7 @@ impl BatchConfig for BackFillBatchConfig {
|
||||
fn max_batch_processing_attempts() -> u8 {
|
||||
MAX_BATCH_PROCESSING_ATTEMPTS
|
||||
}
|
||||
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockWrapper<T>]) -> u64 {
|
||||
fn batch_attempt_hash<T: EthSpec>(blocks: &[RpcBlock<T>]) -> u64 {
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
let mut hasher = DefaultHasher::new();
|
||||
@@ -392,7 +392,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
|
||||
batch_id: BatchId,
|
||||
peer_id: &PeerId,
|
||||
request_id: Id,
|
||||
beacon_block: Option<BlockWrapper<T::EthSpec>>,
|
||||
beacon_block: Option<RpcBlock<T::EthSpec>>,
|
||||
) -> Result<ProcessResult, BackFillError> {
|
||||
// check if we have this batch
|
||||
let batch = match self.batches.get_mut(&batch_id) {
|
||||
|
||||
@@ -10,7 +10,7 @@ use super::{
|
||||
use crate::metrics;
|
||||
use crate::network_beacon_processor::ChainSegmentProcessId;
|
||||
use crate::sync::block_lookups::single_block_lookup::LookupId;
|
||||
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
|
||||
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
|
||||
use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
|
||||
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
|
||||
use lighthouse_network::rpc::RPCError;
|
||||
@@ -34,7 +34,7 @@ mod single_block_lookup;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub type DownloadedBlocks<T> = (Hash256, BlockWrapper<T>);
|
||||
pub type DownloadedBlocks<T> = (Hash256, RpcBlock<T>);
|
||||
pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>);
|
||||
pub type RootBlobsTuple<T> = (Hash256, FixedBlobSidecarList<T>);
|
||||
|
||||
@@ -381,7 +381,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
if !has_pending_parent_request {
|
||||
let rpc_block = request_ref
|
||||
.get_downloaded_block()
|
||||
.unwrap_or(BlockWrapper::Block(block));
|
||||
.unwrap_or(RpcBlock::new_without_blobs(block));
|
||||
// This is the correct block, send it for processing
|
||||
match self.send_block_for_processing(
|
||||
block_root,
|
||||
@@ -910,11 +910,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
BlockError::ParentUnknown(block) => {
|
||||
let slot = block.slot();
|
||||
let parent_root = block.parent_root();
|
||||
let (block, blobs) = block.deconstruct();
|
||||
request_ref.add_unknown_parent_components(UnknownParentComponents::new(
|
||||
Some(block),
|
||||
blobs,
|
||||
));
|
||||
request_ref.add_unknown_parent_components(block.into());
|
||||
self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx);
|
||||
ShouldRemoveLookup::False
|
||||
}
|
||||
@@ -1226,7 +1222,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
fn send_block_for_processing(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
block: BlockWrapper<T::EthSpec>,
|
||||
block: RpcBlock<T::EthSpec>,
|
||||
duration: Duration,
|
||||
process_type: BlockProcessType,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
|
||||
@@ -3,8 +3,8 @@ use super::{BlobRequestId, BlockRequestId, DownloadedBlocks, PeerShouldHave, Res
|
||||
use crate::sync::block_lookups::single_block_lookup::{State, UnknownParentComponents};
|
||||
use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple};
|
||||
use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext};
|
||||
use beacon_chain::blob_verification::AsBlock;
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::block_verification_types::AsBlock;
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use lighthouse_network::PeerId;
|
||||
@@ -147,7 +147,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
||||
.check_peer_disconnected(peer_id)
|
||||
}
|
||||
|
||||
pub fn add_unknown_parent_block(&mut self, block: BlockWrapper<T::EthSpec>) {
|
||||
pub fn add_unknown_parent_block(&mut self, block: RpcBlock<T::EthSpec>) {
|
||||
let next_parent = block.parent_root();
|
||||
|
||||
// Cache the block.
|
||||
@@ -203,7 +203,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
||||
self,
|
||||
) -> (
|
||||
Hash256,
|
||||
Vec<BlockWrapper<T::EthSpec>>,
|
||||
Vec<RpcBlock<T::EthSpec>>,
|
||||
Vec<Hash256>,
|
||||
SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>,
|
||||
) {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::sync::block_lookups::{BlobRequestId, BlockRequestId, RootBlobsTuple, RootBlockTuple};
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
|
||||
use beacon_chain::{get_block_root, BeaconChainTypes};
|
||||
use lighthouse_network::rpc::methods::BlobsByRootRequest;
|
||||
@@ -138,6 +138,16 @@ pub struct UnknownParentComponents<E: EthSpec> {
|
||||
pub downloaded_blobs: FixedBlobSidecarList<E>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> From<RpcBlock<E>> for UnknownParentComponents<E> {
|
||||
fn from(value: RpcBlock<E>) -> Self {
|
||||
let (block, blobs) = value.deconstruct();
|
||||
let fixed_blobs = blobs.map(|blobs| {
|
||||
FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::<Vec<_>>())
|
||||
});
|
||||
Self::new(Some(block), fixed_blobs)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> UnknownParentComponents<E> {
|
||||
pub fn new(
|
||||
block: Option<Arc<SignedBeaconBlock<E>>>,
|
||||
@@ -284,7 +294,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
|
||||
};
|
||||
}
|
||||
|
||||
pub fn get_downloaded_block(&mut self) -> Option<BlockWrapper<T::EthSpec>> {
|
||||
pub fn get_downloaded_block(&mut self) -> Option<RpcBlock<T::EthSpec>> {
|
||||
self.unknown_parent_components
|
||||
.as_mut()
|
||||
.and_then(|components| {
|
||||
@@ -302,8 +312,16 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
|
||||
downloaded_block,
|
||||
downloaded_blobs,
|
||||
} = components;
|
||||
downloaded_block.as_ref().map(|block| {
|
||||
BlockWrapper::BlockAndBlobs(block.clone(), std::mem::take(downloaded_blobs))
|
||||
downloaded_block.as_ref().and_then(|block| {
|
||||
//TODO(sean) figure out how to properly deal with a consistency error here,
|
||||
// should we downscore the peer sending blobs?
|
||||
let blobs = std::mem::take(downloaded_blobs);
|
||||
let filtered = blobs
|
||||
.into_iter()
|
||||
.filter_map(|b| b.clone())
|
||||
.collect::<Vec<_>>();
|
||||
let blobs = VariableList::from(filtered);
|
||||
RpcBlock::new(block.clone(), Some(blobs)).ok()
|
||||
})
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -1474,7 +1474,7 @@ mod deneb_only {
|
||||
fn parent_block_unknown_parent(mut self) -> Self {
|
||||
self.bl.parent_block_processed(
|
||||
self.block_root,
|
||||
BlockProcessingResult::Err(BlockError::ParentUnknown(BlockWrapper::Block(
|
||||
BlockProcessingResult::Err(BlockError::ParentUnknown(RpcBlock::new_without_blobs(
|
||||
self.parent_block.clone().expect("parent block"),
|
||||
))),
|
||||
ResponseType::Block,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use ssz_types::FixedVector;
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use ssz_types::VariableList;
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
|
||||
|
||||
@@ -16,28 +16,28 @@ pub struct BlocksAndBlobsRequestInfo<T: EthSpec> {
|
||||
}
|
||||
|
||||
impl<T: EthSpec> BlocksAndBlobsRequestInfo<T> {
|
||||
pub fn add_block_response(&mut self, maybe_block: Option<Arc<SignedBeaconBlock<T>>>) {
|
||||
match maybe_block {
|
||||
pub fn add_block_response(&mut self, block_opt: Option<Arc<SignedBeaconBlock<T>>>) {
|
||||
match block_opt {
|
||||
Some(block) => self.accumulated_blocks.push_back(block),
|
||||
None => self.is_blocks_stream_terminated = true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_sidecar_response(&mut self, maybe_sidecar: Option<Arc<BlobSidecar<T>>>) {
|
||||
match maybe_sidecar {
|
||||
pub fn add_sidecar_response(&mut self, sidecar_opt: Option<Arc<BlobSidecar<T>>>) {
|
||||
match sidecar_opt {
|
||||
Some(sidecar) => self.accumulated_sidecars.push_back(sidecar),
|
||||
None => self.is_sidecars_stream_terminated = true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_responses(self) -> Result<Vec<BlockWrapper<T>>, &'static str> {
|
||||
pub fn into_responses(self) -> Result<Vec<RpcBlock<T>>, &'static str> {
|
||||
let BlocksAndBlobsRequestInfo {
|
||||
accumulated_blocks,
|
||||
accumulated_sidecars,
|
||||
..
|
||||
} = self;
|
||||
|
||||
// ASSUMPTION: There can't be more more blobs than blocks. i.e. sending any blob (empty
|
||||
// There can't be more more blobs than blocks. i.e. sending any blob (empty
|
||||
// included) for a skipped slot is not permitted.
|
||||
let mut responses = Vec::with_capacity(accumulated_blocks.len());
|
||||
let mut blob_iter = accumulated_sidecars.into_iter().peekable();
|
||||
@@ -50,29 +50,23 @@ impl<T: EthSpec> BlocksAndBlobsRequestInfo<T> {
|
||||
.unwrap_or(false);
|
||||
pair_next_blob
|
||||
} {
|
||||
blob_list.push(blob_iter.next().expect("iterator is not empty"));
|
||||
blob_list.push(blob_iter.next().ok_or("Missing next blob")?);
|
||||
}
|
||||
|
||||
if blob_list.is_empty() {
|
||||
responses.push(BlockWrapper::Block(block))
|
||||
} else {
|
||||
let mut blobs_fixed = vec![None; T::max_blobs_per_block()];
|
||||
for blob in blob_list {
|
||||
let blob_index = blob.index as usize;
|
||||
let Some(blob_opt) = blobs_fixed.get_mut(blob_index) else {
|
||||
let mut blobs_buffer = vec![None; T::max_blobs_per_block()];
|
||||
for blob in blob_list {
|
||||
let blob_index = blob.index as usize;
|
||||
let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else {
|
||||
return Err("Invalid blob index");
|
||||
};
|
||||
if blob_opt.is_some() {
|
||||
return Err("Repeat blob index");
|
||||
} else {
|
||||
*blob_opt = Some(blob);
|
||||
}
|
||||
if blob_opt.is_some() {
|
||||
return Err("Repeat blob index");
|
||||
} else {
|
||||
*blob_opt = Some(blob);
|
||||
}
|
||||
responses.push(BlockWrapper::BlockAndBlobs(
|
||||
block,
|
||||
FixedVector::from(blobs_fixed),
|
||||
))
|
||||
}
|
||||
let blobs = VariableList::from(blobs_buffer.into_iter().flatten().collect::<Vec<_>>());
|
||||
responses.push(RpcBlock::new(block, Some(blobs))?)
|
||||
}
|
||||
|
||||
// if accumulated sidecars is not empty, throw an error.
|
||||
|
||||
@@ -46,8 +46,8 @@ use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage;
|
||||
pub use crate::sync::block_lookups::ResponseType;
|
||||
use crate::sync::block_lookups::UnknownParentComponents;
|
||||
use crate::sync::range_sync::ByRangeRequestType;
|
||||
use beacon_chain::blob_verification::AsBlock;
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::block_verification_types::AsBlock;
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::{
|
||||
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState,
|
||||
MAXIMUM_GOSSIP_CLOCK_DISPARITY,
|
||||
@@ -127,7 +127,7 @@ pub enum SyncMessage<T: EthSpec> {
|
||||
},
|
||||
|
||||
/// A block with an unknown parent has been received.
|
||||
UnknownParentBlock(PeerId, BlockWrapper<T>, Hash256),
|
||||
UnknownParentBlock(PeerId, RpcBlock<T>, Hash256),
|
||||
|
||||
/// A blob with an unknown parent has been received.
|
||||
UnknownParentBlob(PeerId, Arc<BlobSidecar<T>>),
|
||||
@@ -614,15 +614,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
} => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp),
|
||||
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
|
||||
let block_slot = block.slot();
|
||||
let (block, blobs) = block.deconstruct();
|
||||
let parent_root = block.parent_root();
|
||||
let parent_components = UnknownParentComponents::new(Some(block), blobs);
|
||||
self.handle_unknown_parent(
|
||||
peer_id,
|
||||
block_root,
|
||||
parent_root,
|
||||
block_slot,
|
||||
Some(parent_components),
|
||||
Some(block.into()),
|
||||
);
|
||||
}
|
||||
SyncMessage::UnknownParentBlob(peer_id, blob) => {
|
||||
@@ -910,7 +908,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
batch_id,
|
||||
&peer_id,
|
||||
id,
|
||||
block.map(BlockWrapper::Block),
|
||||
block.map(Into::into),
|
||||
) {
|
||||
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
|
||||
Ok(ProcessResult::Successful) => {}
|
||||
@@ -934,7 +932,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
chain_id,
|
||||
batch_id,
|
||||
id,
|
||||
block.map(BlockWrapper::Block),
|
||||
block.map(Into::into),
|
||||
);
|
||||
self.update_sync_state();
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::network_beacon_processor::NetworkBeaconProcessor;
|
||||
use crate::service::{NetworkMessage, RequestId};
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::block_lookups::{BlobRequestId, BlockRequestId};
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
|
||||
@@ -22,7 +22,7 @@ use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
|
||||
|
||||
pub struct BlocksAndBlobsByRangeResponse<T: EthSpec> {
|
||||
pub batch_id: BatchId,
|
||||
pub responses: Result<Vec<BlockWrapper<T>>, &'static str>,
|
||||
pub responses: Result<Vec<RpcBlock<T>>, &'static str>,
|
||||
}
|
||||
|
||||
pub struct BlocksAndBlobsByRangeRequest<T: EthSpec> {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::sync::manager::Id;
|
||||
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
|
||||
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
|
||||
use lighthouse_network::rpc::methods::BlocksByRangeRequest;
|
||||
use lighthouse_network::PeerId;
|
||||
use std::collections::HashSet;
|
||||
@@ -56,7 +56,7 @@ pub trait BatchConfig {
|
||||
/// Note that simpler hashing functions considered in the past (hash of first block, hash of last
|
||||
/// block, number of received blocks) are not good enough to differentiate attempts. For this
|
||||
/// reason, we hash the complete set of blocks both in RangeSync and BackFillSync.
|
||||
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockWrapper<T>]) -> u64;
|
||||
fn batch_attempt_hash<T: EthSpec>(blocks: &[RpcBlock<T>]) -> u64;
|
||||
}
|
||||
|
||||
pub struct RangeSyncBatchConfig {}
|
||||
@@ -68,7 +68,7 @@ impl BatchConfig for RangeSyncBatchConfig {
|
||||
fn max_batch_processing_attempts() -> u8 {
|
||||
MAX_BATCH_PROCESSING_ATTEMPTS
|
||||
}
|
||||
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockWrapper<T>]) -> u64 {
|
||||
fn batch_attempt_hash<T: EthSpec>(blocks: &[RpcBlock<T>]) -> u64 {
|
||||
let mut hasher = std::collections::hash_map::DefaultHasher::new();
|
||||
blocks.hash(&mut hasher);
|
||||
hasher.finish()
|
||||
@@ -116,9 +116,9 @@ pub enum BatchState<T: EthSpec> {
|
||||
/// The batch has failed either downloading or processing, but can be requested again.
|
||||
AwaitingDownload,
|
||||
/// The batch is being downloaded.
|
||||
Downloading(PeerId, Vec<BlockWrapper<T>>, Id),
|
||||
Downloading(PeerId, Vec<RpcBlock<T>>, Id),
|
||||
/// The batch has been completely downloaded and is ready for processing.
|
||||
AwaitingProcessing(PeerId, Vec<BlockWrapper<T>>),
|
||||
AwaitingProcessing(PeerId, Vec<RpcBlock<T>>),
|
||||
/// The batch is being processed.
|
||||
Processing(Attempt),
|
||||
/// The batch was successfully processed and is waiting to be validated.
|
||||
@@ -251,7 +251,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
|
||||
}
|
||||
|
||||
/// Adds a block to a downloading batch.
|
||||
pub fn add_block(&mut self, block: BlockWrapper<T>) -> Result<(), WrongState> {
|
||||
pub fn add_block(&mut self, block: RpcBlock<T>) -> Result<(), WrongState> {
|
||||
match self.state.poison() {
|
||||
BatchState::Downloading(peer, mut blocks, req_id) => {
|
||||
blocks.push(block);
|
||||
@@ -383,7 +383,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_processing(&mut self) -> Result<Vec<BlockWrapper<T>>, WrongState> {
|
||||
pub fn start_processing(&mut self) -> Result<Vec<RpcBlock<T>>, WrongState> {
|
||||
match self.state.poison() {
|
||||
BatchState::AwaitingProcessing(peer, blocks) => {
|
||||
self.state = BatchState::Processing(Attempt::new::<B, T>(peer, &blocks));
|
||||
@@ -481,7 +481,7 @@ pub struct Attempt {
|
||||
}
|
||||
|
||||
impl Attempt {
|
||||
fn new<B: BatchConfig, T: EthSpec>(peer_id: PeerId, blocks: &[BlockWrapper<T>]) -> Self {
|
||||
fn new<B: BatchConfig, T: EthSpec>(peer_id: PeerId, blocks: &[RpcBlock<T>]) -> Self {
|
||||
let hash = B::batch_attempt_hash(blocks);
|
||||
Attempt { peer_id, hash }
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::network_beacon_processor::ChainSegmentProcessId;
|
||||
use crate::sync::{
|
||||
manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult,
|
||||
};
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use fnv::FnvHashMap;
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
@@ -221,7 +221,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
batch_id: BatchId,
|
||||
peer_id: &PeerId,
|
||||
request_id: Id,
|
||||
beacon_block: Option<BlockWrapper<T::EthSpec>>,
|
||||
beacon_block: Option<RpcBlock<T::EthSpec>>,
|
||||
) -> ProcessingResult {
|
||||
// check if we have this batch
|
||||
let batch = match self.batches.get_mut(&batch_id) {
|
||||
|
||||
@@ -47,7 +47,7 @@ use crate::status::ToStatusMessage;
|
||||
use crate::sync::manager::Id;
|
||||
use crate::sync::network_context::SyncNetworkContext;
|
||||
use crate::sync::BatchProcessResult;
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use lighthouse_network::rpc::GoodbyeReason;
|
||||
use lighthouse_network::PeerId;
|
||||
@@ -210,7 +210,7 @@ where
|
||||
chain_id: ChainId,
|
||||
batch_id: BatchId,
|
||||
request_id: Id,
|
||||
beacon_block: Option<BlockWrapper<T::EthSpec>>,
|
||||
beacon_block: Option<RpcBlock<T::EthSpec>>,
|
||||
) {
|
||||
// check if this chunk removes the chain
|
||||
match self.chains.call_by_id(chain_id, |chain| {
|
||||
@@ -387,19 +387,18 @@ mod tests {
|
||||
use beacon_chain::builder::Witness;
|
||||
use beacon_chain::eth1_chain::CachingEth1Backend;
|
||||
use beacon_chain::parking_lot::RwLock;
|
||||
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
|
||||
use beacon_chain::EngineState;
|
||||
use beacon_processor::WorkEvent as BeaconWorkEvent;
|
||||
use lighthouse_network::rpc::BlocksByRangeRequest;
|
||||
use lighthouse_network::Request;
|
||||
use lighthouse_network::{rpc::StatusMessage, NetworkGlobals};
|
||||
use slog::{o, Drain};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
|
||||
use slot_clock::TestingSlotClock;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use store::MemoryStore;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{Hash256, MinimalEthSpec as E};
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
Reference in New Issue
Block a user