From 405e95b0ce15409f06504f45c8d93071523e9539 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 14 Jul 2023 16:15:28 -0400 Subject: [PATCH] fix merge --- .../gossip_methods.rs | 2 +- .../src/network_beacon_processor/mod.rs | 26 +++++-------------- .../network_beacon_processor/sync_methods.rs | 22 ++++++++-------- .../src/network_beacon_processor/tests.rs | 18 +++++++------ beacon_node/network/src/router.rs | 16 ++++-------- beacon_node/network/src/service/tests.rs | 2 +- .../src/sync/block_lookups/delayed_lookup.rs | 6 ++--- .../network/src/sync/block_lookups/mod.rs | 14 ++++++---- beacon_node/network/src/sync/manager.rs | 2 +- .../network/src/sync/range_sync/range.rs | 2 +- 10 files changed, 49 insertions(+), 61 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 2255b40170..000c4d85dc 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -5,8 +5,8 @@ use crate::{ sync::SyncMessage, }; +use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::{BlobError, GossipVerifiedBlob}; -use beacon_chain::block_verification_types::AsBlock; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 166417ba93..ef4b3daae7 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -2,7 +2,7 @@ use crate::{ service::NetworkMessage, sync::{manager::BlockProcessType, SyncMessage}, }; -use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain, }; @@ -13,6 +13,7 @@ use beacon_processor::{ MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, }; use environment::null_logger; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, @@ -26,7 +27,6 @@ use store::MemoryStore; use task_executor::test_utils::TestRuntime; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, error::TrySendError}; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use types::*; pub use sync_methods::ChainSegmentProcessId; @@ -229,9 +229,7 @@ impl NetworkBeaconProcessor { }) } - pub fn send_banana(){ - - } + pub fn send_banana() {} /// Create a new `Work` event for some sync committee signature. pub fn send_gossip_sync_signature( @@ -413,7 +411,7 @@ impl NetworkBeaconProcessor { pub fn send_rpc_beacon_block( self: &Arc, block_root: Hash256, - block: RpcBlock, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Result<(), Error> { @@ -454,7 +452,7 @@ impl NetworkBeaconProcessor { pub fn send_chain_segment( self: &Arc, process_id: ChainSegmentProcessId, - blocks: Vec>, + blocks: Vec>, ) -> Result<(), Error> { let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); let processor = self.clone(); @@ -563,12 +561,7 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = move |send_idle_on_drop| { - processor.handle_blobs_by_range_request( - send_idle_on_drop, - peer_id, - request_id, - request, - ) + processor.handle_blobs_by_range_request(send_idle_on_drop, peer_id, request_id, request) }; self.try_send(BeaconWorkEvent { @@ -586,12 +579,7 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = move |send_idle_on_drop| { - processor.handle_blobs_by_root_request( - send_idle_on_drop, - peer_id, - request_id, - request, - ) + processor.handle_blobs_by_root_request(send_idle_on_drop, peer_id, request_id, request) }; self.try_send(BeaconWorkEvent { diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 3a09373c56..8d9146e688 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -6,8 +6,7 @@ use crate::sync::{ manager::{BlockProcessType, SyncMessage}, ChainId, }; -use beacon_chain::data_availability_checker::MaybeAvailableBlock; -use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; +use beacon_chain::blob_verification::{AsBlock, BlockWrapper, MaybeAvailableBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::{ observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, @@ -55,7 +54,7 @@ impl NetworkBeaconProcessor { pub fn generate_rpc_beacon_block_process_fn( self: Arc, block_root: Hash256, - block: RpcBlock, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, ) -> AsyncFn { @@ -79,7 +78,7 @@ impl NetworkBeaconProcessor { pub fn generate_rpc_beacon_block_fns( self: Arc, block_root: Hash256, - block: RpcBlock, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, ) -> (AsyncFn, BlockingFn) { @@ -107,7 +106,7 @@ impl NetworkBeaconProcessor { pub async fn process_rpc_block( self: Arc>, block_root: Hash256, - block: RpcBlock, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, reprocess_tx: mpsc::Sender, @@ -270,7 +269,8 @@ impl NetworkBeaconProcessor { process_type: BlockProcessType, ) -> AsyncFn { let process_fn = async move { - self.clone().process_rpc_blobs(block_root, block, seen_timestamp, process_type) + self.clone() + .process_rpc_blobs(block_root, block, seen_timestamp, process_type) .await; }; Box::pin(process_fn) @@ -306,7 +306,7 @@ impl NetworkBeaconProcessor { }); } - pub fn send_delayed_lookup(&self, block_root: Hash256){ + pub fn send_delayed_lookup(&self, block_root: Hash256) { self.send_sync_message(SyncMessage::MissingGossipBlockComponentsDelayed(block_root)) } @@ -315,7 +315,7 @@ impl NetworkBeaconProcessor { pub async fn process_chain_segment( &self, sync_type: ChainSegmentProcessId, - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, notify_execution_layer: NotifyExecutionLayer, ) { let result = match sync_type { @@ -440,7 +440,7 @@ impl NetworkBeaconProcessor { /// Helper function to process blocks batches which only consumes the chain and blocks to process. async fn process_blocks<'a>( &self, - downloaded_blocks: impl Iterator>, + downloaded_blocks: impl Iterator>, notify_execution_layer: NotifyExecutionLayer, ) -> (usize, Result<(), ChainSegmentFailed>) { let blocks: Vec<_> = downloaded_blocks.cloned().collect(); @@ -473,7 +473,7 @@ impl NetworkBeaconProcessor { /// Helper function to process backfill block batches which only consumes the chain and blocks to process. fn process_backfill_blocks( &self, - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); let available_blocks = match downloaded_blocks @@ -481,7 +481,7 @@ impl NetworkBeaconProcessor { .map(|block| { self.chain .data_availability_checker - .check_rpc_block_availability(block) + .check_availability(block) }) .collect::, _>>() { diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 39d3575d6e..2c37d177aa 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -336,14 +336,16 @@ impl TestRig { } pub fn enqueue_blobs_by_range_request(&self, count: u64) { - self.network_beacon_processor.send_blobs_by_range_request( - PeerId::random(), - (ConnectionId::new(42), SubstreamId::new(24)), - BlobsByRangeRequest { - start_slot: 0, - count, - }, - ).unwrap(); + self.network_beacon_processor + .send_blobs_by_range_request( + PeerId::random(), + (ConnectionId::new(42), SubstreamId::new(24)), + BlobsByRangeRequest { + start_slot: 0, + count, + }, + ) + .unwrap(); } pub fn enqueue_backfill_batch(&self) { diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 5a954d05a4..30a75a9105 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -209,18 +209,12 @@ impl Router { .send_blocks_by_roots_request(peer_id, request_id, request), ), Request::BlobsByRange(request) => self.handle_beacon_processor_send_result( - self.network_beacon_processor.send_blobs_by_range_request( - peer_id, - request_id, - request, - ), + self.network_beacon_processor + .send_blobs_by_range_request(peer_id, request_id, request), ), Request::BlobsByRoot(request) => self.handle_beacon_processor_send_result( - self.network_beacon_processor.send_blobs_by_roots_request( - peer_id, - request_id, - request, - ), + self.network_beacon_processor + .send_blobs_by_roots_request(peer_id, request_id, request), ), Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result( self.network_beacon_processor @@ -311,7 +305,7 @@ impl Router { blob_index, signed_blob, timestamp_now(), - ) + ), ) } PubsubMessage::VoluntaryExit(exit) => { diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 10110aa891..544c5dd9c7 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -3,6 +3,7 @@ mod tests { use crate::persisted_dht::load_dht; use crate::{NetworkConfig, NetworkService}; + use beacon_chain::test_utils::EphemeralHarnessType; use beacon_processor::{ BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, }; @@ -12,7 +13,6 @@ mod tests { use std::str::FromStr; use std::sync::Arc; use tokio::{runtime::Runtime, sync::mpsc}; - use beacon_chain::test_utils::EphemeralHarnessType; use types::MinimalEthSpec as E; type BeaconChainHarness = beacon_chain::test_utils::BeaconChainHarness>; diff --git a/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs b/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs index c492470b4a..55e9e49db3 100644 --- a/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/delayed_lookup.rs @@ -1,12 +1,12 @@ +use crate::network_beacon_processor::NetworkBeaconProcessor; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use slog::{crit, }; +use slog::crit; use slot_clock::SlotClock; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::interval_at; use tokio::time::Instant; -use types::{ Hash256}; -use crate::network_beacon_processor::NetworkBeaconProcessor; +use types::Hash256; #[derive(Debug)] pub enum DelayedLookupMessage { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index ff095c719e..dfe960832a 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -10,7 +10,7 @@ use super::{ use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::block_lookups::single_block_lookup::LookupId; -use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; +use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use lighthouse_network::rpc::RPCError; @@ -34,7 +34,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; -pub type DownloadedBlocks = (Hash256, RpcBlock); +pub type DownloadedBlocks = (Hash256, BlockWrapper); pub type RootBlockTuple = (Hash256, Arc>); pub type RootBlobsTuple = (Hash256, FixedBlobSidecarList); @@ -381,7 +381,7 @@ impl BlockLookups { if !has_pending_parent_request { let rpc_block = request_ref .get_downloaded_block() - .unwrap_or(RpcBlock::new_without_blobs(block)); + .unwrap_or(BlockWrapper::Block(block)); // This is the correct block, send it for processing match self.send_block_for_processing( block_root, @@ -910,7 +910,11 @@ impl BlockLookups { BlockError::ParentUnknown(block) => { let slot = block.slot(); let parent_root = block.parent_root(); - request_ref.add_unknown_parent_components(block.into()); + let (block, blobs) = block.deconstruct(); + request_ref.add_unknown_parent_components(UnknownParentComponents::new( + Some(block), + blobs, + )); self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); ShouldRemoveLookup::False } @@ -1222,7 +1226,7 @@ impl BlockLookups { fn send_block_for_processing( &mut self, block_root: Hash256, - block: RpcBlock, + block: BlockWrapper, duration: Duration, process_type: BlockProcessType, cx: &mut SyncNetworkContext, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 8bdf57e2a4..93b7c9af5b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -127,7 +127,7 @@ pub enum SyncMessage { }, /// A block with an unknown parent has been received. - UnknownParentBlock(PeerId, RpcBlock, Hash256), + UnknownParentBlock(PeerId, BlockWrapper, Hash256), /// A blob with an unknown parent has been received. UnknownParentBlob(PeerId, Arc>), diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 09a85208d9..733cbcc9ea 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -396,7 +396,7 @@ mod tests { use tokio::sync::mpsc; use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; - use slot_clock::{TestingSlotClock, }; + use slot_clock::TestingSlotClock; use std::collections::HashSet; use std::sync::Arc; use store::MemoryStore;