diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 000c4d85dc..2255b40170 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -5,8 +5,8 @@ use crate::{ sync::SyncMessage, }; -use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::{BlobError, GossipVerifiedBlob}; +use beacon_chain::block_verification_types::AsBlock; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ef4b3daae7..166417ba93 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -2,7 +2,7 @@ use crate::{ service::NetworkMessage, sync::{manager::BlockProcessType, SyncMessage}, }; -use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain, }; @@ -13,7 +13,6 @@ use beacon_processor::{ MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, }; use environment::null_logger; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, @@ -27,6 +26,7 @@ use store::MemoryStore; use task_executor::test_utils::TestRuntime; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, error::TrySendError}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use types::*; pub use sync_methods::ChainSegmentProcessId; @@ -229,7 +229,9 @@ impl NetworkBeaconProcessor { }) } - pub fn send_banana() {} + pub fn send_banana(){ + + } /// Create a new `Work` event for some sync committee signature. pub fn send_gossip_sync_signature( @@ -411,7 +413,7 @@ impl NetworkBeaconProcessor { pub fn send_rpc_beacon_block( self: &Arc, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Result<(), Error> { @@ -452,7 +454,7 @@ impl NetworkBeaconProcessor { pub fn send_chain_segment( self: &Arc, process_id: ChainSegmentProcessId, - blocks: Vec>, + blocks: Vec>, ) -> Result<(), Error> { let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); let processor = self.clone(); @@ -561,7 +563,12 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = move |send_idle_on_drop| { - processor.handle_blobs_by_range_request(send_idle_on_drop, peer_id, request_id, request) + processor.handle_blobs_by_range_request( + send_idle_on_drop, + peer_id, + request_id, + request, + ) }; self.try_send(BeaconWorkEvent { @@ -579,7 +586,12 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = move |send_idle_on_drop| { - processor.handle_blobs_by_root_request(send_idle_on_drop, peer_id, request_id, request) + processor.handle_blobs_by_root_request( + send_idle_on_drop, + peer_id, + request_id, + request, + ) }; self.try_send(BeaconWorkEvent { diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 8d9146e688..3a09373c56 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -6,7 +6,8 @@ use crate::sync::{ manager::{BlockProcessType, SyncMessage}, ChainId, }; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper, MaybeAvailableBlock}; +use beacon_chain::data_availability_checker::MaybeAvailableBlock; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::{ observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, @@ -54,7 +55,7 @@ impl NetworkBeaconProcessor { pub fn generate_rpc_beacon_block_process_fn( self: Arc, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, ) -> AsyncFn { @@ -78,7 +79,7 @@ impl NetworkBeaconProcessor { pub fn generate_rpc_beacon_block_fns( self: Arc, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, ) -> (AsyncFn, BlockingFn) { @@ -106,7 +107,7 @@ impl NetworkBeaconProcessor { pub async fn process_rpc_block( self: Arc>, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, reprocess_tx: mpsc::Sender, @@ -269,8 +270,7 @@ impl NetworkBeaconProcessor { process_type: BlockProcessType, ) -> AsyncFn { let process_fn = async move { - self.clone() - .process_rpc_blobs(block_root, block, seen_timestamp, process_type) + self.clone().process_rpc_blobs(block_root, block, seen_timestamp, process_type) .await; }; Box::pin(process_fn) @@ -306,7 +306,7 @@ impl NetworkBeaconProcessor { }); } - pub fn send_delayed_lookup(&self, block_root: Hash256) { + pub fn send_delayed_lookup(&self, block_root: Hash256){ self.send_sync_message(SyncMessage::MissingGossipBlockComponentsDelayed(block_root)) } @@ -315,7 +315,7 @@ impl NetworkBeaconProcessor { pub async fn process_chain_segment( &self, sync_type: ChainSegmentProcessId, - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, notify_execution_layer: NotifyExecutionLayer, ) { let result = match sync_type { @@ -440,7 +440,7 @@ impl NetworkBeaconProcessor { /// Helper function to process blocks batches which only consumes the chain and blocks to process. async fn process_blocks<'a>( &self, - downloaded_blocks: impl Iterator>, + downloaded_blocks: impl Iterator>, notify_execution_layer: NotifyExecutionLayer, ) -> (usize, Result<(), ChainSegmentFailed>) { let blocks: Vec<_> = downloaded_blocks.cloned().collect(); @@ -473,7 +473,7 @@ impl NetworkBeaconProcessor { /// Helper function to process backfill block batches which only consumes the chain and blocks to process. fn process_backfill_blocks( &self, - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); let available_blocks = match downloaded_blocks @@ -481,7 +481,7 @@ impl NetworkBeaconProcessor { .map(|block| { self.chain .data_availability_checker - .check_availability(block) + .check_rpc_block_availability(block) }) .collect::, _>>() { diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 2c37d177aa..39d3575d6e 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -336,16 +336,14 @@ impl TestRig { } pub fn enqueue_blobs_by_range_request(&self, count: u64) { - self.network_beacon_processor - .send_blobs_by_range_request( - PeerId::random(), - (ConnectionId::new(42), SubstreamId::new(24)), - BlobsByRangeRequest { - start_slot: 0, - count, - }, - ) - .unwrap(); + self.network_beacon_processor.send_blobs_by_range_request( + PeerId::random(), + (ConnectionId::new(42), SubstreamId::new(24)), + BlobsByRangeRequest { + start_slot: 0, + count, + }, + ).unwrap(); } pub fn enqueue_backfill_batch(&self) { diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 30a75a9105..5a954d05a4 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -209,12 +209,18 @@ impl Router { .send_blocks_by_roots_request(peer_id, request_id, request), ), Request::BlobsByRange(request) => self.handle_beacon_processor_send_result( - self.network_beacon_processor - .send_blobs_by_range_request(peer_id, request_id, request), + self.network_beacon_processor.send_blobs_by_range_request( + peer_id, + request_id, + request, + ), ), Request::BlobsByRoot(request) => self.handle_beacon_processor_send_result( - self.network_beacon_processor - .send_blobs_by_roots_request(peer_id, request_id, request), + self.network_beacon_processor.send_blobs_by_roots_request( + peer_id, + request_id, + request, + ), ), Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result( self.network_beacon_processor @@ -305,7 +311,7 @@ impl Router { blob_index, signed_blob, timestamp_now(), - ), + ) ) } PubsubMessage::VoluntaryExit(exit) => { diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 544c5dd9c7..10110aa891 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -3,7 +3,6 @@ mod tests { use crate::persisted_dht::load_dht; use crate::{NetworkConfig, NetworkService}; - use beacon_chain::test_utils::EphemeralHarnessType; use beacon_processor::{ BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, }; @@ -13,6 +12,7 @@ mod tests { use std::str::FromStr; use std::sync::Arc; use tokio::{runtime::Runtime, sync::mpsc}; + use beacon_chain::test_utils::EphemeralHarnessType; use types::MinimalEthSpec as E; type BeaconChainHarness = beacon_chain::test_utils::BeaconChainHarness>; diff --git a/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs b/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs index 55e9e49db3..c492470b4a 100644 --- a/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs @@ -1,12 +1,12 @@ -use crate::network_beacon_processor::NetworkBeaconProcessor; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use slog::crit; +use slog::{crit, }; use slot_clock::SlotClock; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::interval_at; use tokio::time::Instant; -use types::Hash256; +use types::{ Hash256}; +use crate::network_beacon_processor::NetworkBeaconProcessor; #[derive(Debug)] pub enum DelayedLookupMessage { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index dfe960832a..ff095c719e 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -10,7 +10,7 @@ use super::{ use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::block_lookups::single_block_lookup::LookupId; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use lighthouse_network::rpc::RPCError; @@ -34,7 +34,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; -pub type DownloadedBlocks = (Hash256, BlockWrapper); +pub type DownloadedBlocks = (Hash256, RpcBlock); pub type RootBlockTuple = (Hash256, Arc>); pub type RootBlobsTuple = (Hash256, FixedBlobSidecarList); @@ -381,7 +381,7 @@ impl BlockLookups { if !has_pending_parent_request { let rpc_block = request_ref .get_downloaded_block() - .unwrap_or(BlockWrapper::Block(block)); + .unwrap_or(RpcBlock::new_without_blobs(block)); // This is the correct block, send it for processing match self.send_block_for_processing( block_root, @@ -910,11 +910,7 @@ impl BlockLookups { BlockError::ParentUnknown(block) => { let slot = block.slot(); let parent_root = block.parent_root(); - let (block, blobs) = block.deconstruct(); - request_ref.add_unknown_parent_components(UnknownParentComponents::new( - Some(block), - blobs, - )); + request_ref.add_unknown_parent_components(block.into()); self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); ShouldRemoveLookup::False } @@ -1226,7 +1222,7 @@ impl BlockLookups { fn send_block_for_processing( &mut self, block_root: Hash256, - block: BlockWrapper, + block: RpcBlock, duration: Duration, process_type: BlockProcessType, cx: &mut SyncNetworkContext, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 93b7c9af5b..8bdf57e2a4 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -127,7 +127,7 @@ pub enum SyncMessage { }, /// A block with an unknown parent has been received. - UnknownParentBlock(PeerId, BlockWrapper, Hash256), + UnknownParentBlock(PeerId, RpcBlock, Hash256), /// A blob with an unknown parent has been received. UnknownParentBlob(PeerId, Arc>), diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 733cbcc9ea..09a85208d9 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -396,7 +396,7 @@ mod tests { use tokio::sync::mpsc; use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; - use slot_clock::TestingSlotClock; + use slot_clock::{TestingSlotClock, }; use std::collections::HashSet; use std::sync::Arc; use store::MemoryStore;