From e27161ed2171e34527d17cb8067c0a16a684e132 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 25 Jul 2023 16:18:59 -0400 Subject: [PATCH] get tests compiling --- .../network_beacon_processor/sync_methods.rs | 4 - .../network/src/sync/block_lookups/mod.rs | 71 +-- .../src/sync/block_lookups/parent_lookup.rs | 20 +- .../sync/block_lookups/single_block_lookup.rs | 114 +++-- .../network/src/sync/block_lookups/tests.rs | 428 +++++++++++------- beacon_node/network/src/sync/manager.rs | 40 +- .../network/src/sync/network_context.rs | 17 +- 7 files changed, 388 insertions(+), 306 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index b21bc6abde..a1176bebbc 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,6 +1,5 @@ use crate::metrics; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; -use crate::sync::manager::ResponseType; use crate::sync::BatchProcessResult; use crate::sync::{ manager::{BlockProcessType, SyncMessage}, @@ -96,7 +95,6 @@ impl NetworkBeaconProcessor { self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, result: crate::sync::manager::BlockProcessingResult::Ignored, - response_type: crate::sync::manager::ResponseType::Block, }); }; (process_fn, Box::new(ignore_fn)) @@ -251,7 +249,6 @@ impl NetworkBeaconProcessor { self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, result: result.into(), - response_type: ResponseType::Block, }); // Drop the handle to remove the entry from the cache @@ -303,7 +300,6 @@ impl NetworkBeaconProcessor { self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, result: result.into(), - response_type: ResponseType::Blob, }); } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index d9184cee59..79ef277468 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -5,7 +5,9 @@ use super::BatchProcessResult; use super::{manager::BlockProcessType, network_context::SyncNetworkContext}; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; -use crate::sync::block_lookups::single_block_lookup::LookupId; +use crate::sync::block_lookups::parent_lookup::ParentLookup; +use crate::sync::block_lookups::single_block_lookup::LookupVerifyError; +use crate::sync::manager::{Id, ResponseType}; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; @@ -26,7 +28,7 @@ use std::time::Duration; use store::{Hash256, SignedBeaconBlock}; use strum::Display; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, EthSpec, Slot}; +use types::Slot; pub(crate) mod delayed_lookup; mod parent_lookup; @@ -116,7 +118,7 @@ impl BlockLookups { &mut self, block_root: Hash256, peer_source: PeerShouldHave, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let lookup = self.new_current_lookup(block_root, None, &[peer_source], cx); if let Some(lookup) = lookup { @@ -131,7 +133,7 @@ impl BlockLookups { &mut self, block_root: Hash256, peer_source: PeerShouldHave, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let lookup = self.new_current_lookup(block_root, None, &[peer_source], cx); if let Some(lookup) = lookup { @@ -150,7 +152,7 @@ impl BlockLookups { block_root: Hash256, parent_components: Option>, peer_source: &[PeerShouldHave], - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let lookup = self.new_current_lookup(block_root, parent_components, peer_source, cx); if let Some(lookup) = lookup { @@ -170,7 +172,7 @@ impl BlockLookups { block_root: Hash256, parent_components: Option>, peer_source: &[PeerShouldHave], - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let lookup = self.new_current_lookup(block_root, parent_components, peer_source, cx); if let Some(lookup) = lookup { @@ -207,10 +209,11 @@ impl BlockLookups { cx: &SyncNetworkContext, ) -> Result<(), ()> { for (_, lookup) in self.single_block_lookups.iter_mut() { - if lookup.block_request_state.requested_block_root == block_root && !lookup.triggered { - if lookup.request_block_and_blobs(cx).is_ok() { - lookup.triggered = true; - } + if lookup.block_request_state.requested_block_root == block_root + && !lookup.triggered + && lookup.request_block_and_blobs(cx).is_ok() + { + lookup.triggered = true; } } Ok(()) @@ -228,13 +231,13 @@ impl BlockLookups { block_root: Hash256, parent_components: Option>, peers: &[PeerShouldHave], - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Option> { // Do not re-request a block that is already being requested if let Some((_, lookup)) = self .single_block_lookups .iter_mut() - .find(|(id, lookup)| lookup.is_for_block(block_root)) + .find(|(_id, lookup)| lookup.is_for_block(block_root)) { lookup.add_peers(peers); if let Some(components) = parent_components { @@ -274,7 +277,7 @@ impl BlockLookups { parent_components, peers, self.da_checker.clone(), - cx, + cx.next_id(), )) } @@ -286,7 +289,7 @@ impl BlockLookups { block_root: Hash256, parent_root: Hash256, peer_id: PeerId, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { // Gossip blocks or blobs shouldn't be propagated if parents are unavailable. let peer_source = PeerShouldHave::BlockAndBlobs(peer_id); @@ -383,17 +386,15 @@ impl BlockLookups { } } } - } else { - if let Err(()) = R::send_for_processing( - id, - self, - root, - R::verified_to_reconstructed(verified_response), - seen_timestamp, - &cx, - ) { - self.single_block_lookups.remove(&id); - } + } else if let Err(()) = R::send_for_processing( + id, + self, + root, + R::verified_to_reconstructed(verified_response), + seen_timestamp, + cx, + ) { + self.single_block_lookups.remove(&id); } } Ok(None) => {} @@ -616,7 +617,13 @@ impl BlockLookups { let request_state = R::request_state_mut(lookup); request_state.register_failure_downloading(); let response_type = R::response_type(); - trace!(self.log, "Single lookup failed"; "block_root" => ?root, "error" => msg, "response_type" => ?response_type); + trace!(self.log, + "Single lookup failed"; + "block_root" => ?root, + "error" => msg, + "peer_id" => %peer_id, + "response_type" => ?response_type + ); if let Err(()) = request_state.retry_request_after_failure(id, cx, &self.log) { self.single_block_lookups.remove(&id); }; @@ -633,7 +640,7 @@ impl BlockLookups { &mut self, target_id: Id, result: BlockProcessingResult, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let Some(request_ref) = self.single_block_lookups.get_mut(&target_id) else { debug!(self.log, "Block component processed for single block lookup not present" ); @@ -793,8 +800,7 @@ impl BlockLookups { &mut self, chain_hash: Hash256, result: BlockProcessingResult, - response_type: ResponseType, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let index = self .parent_lookups @@ -865,7 +871,8 @@ impl BlockLookups { // to send for processing. if let Some(child_lookup_id) = self.single_block_lookups.iter().find_map(|(id, lookup)| { - (lookup.block_request_state.requested_block_root == chain_hash).then(|| *id) + (lookup.block_request_state.requested_block_root == chain_hash) + .then_some(*id) }) { let Some(child_lookup) = self.single_block_lookups.get_mut(&child_lookup_id) else { @@ -972,7 +979,7 @@ impl BlockLookups { .single_block_lookups .iter() .find_map(|(id, req)| - (req.block_request_state.requested_block_root == chain_hash).then(|| *id)) else { + (req.block_request_state.requested_block_root == chain_hash).then_some(*id)) else { warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); return; }; @@ -991,7 +998,7 @@ impl BlockLookups { chain_hash, rpc_block, Duration::from_secs(0), //TODO(sean) pipe this through - BlockProcessType::SingleBlock { id: id }, + BlockProcessType::SingleBlock { id }, cx, ) .is_err() diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 1df51bf7f0..6afeaccc7b 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -3,7 +3,6 @@ use super::{DownloadedBlocks, PeerShouldHave}; use crate::sync::block_lookups::single_block_lookup::{ Parent, RequestState, State, UnknownParentComponents, }; -use crate::sync::block_lookups::Lookup; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; @@ -14,8 +13,6 @@ use lighthouse_network::PeerId; use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; -use types::blob_sidecar::FixedBlobSidecarList; -use types::SignedBeaconBlock; /// How many attempts we try to find a parent of a block before we give up trying. pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; @@ -67,14 +64,14 @@ impl ParentLookup { parent_root: Hash256, peer_id: PeerShouldHave, da_checker: Arc>, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Self { let current_parent_request = SingleBlockLookup::new( parent_root, Some(<_>::default()), &[peer_id], da_checker, - cx, + cx.next_id(), ); Self { @@ -144,16 +141,6 @@ impl ParentLookup { Some(UnknownParentComponents::default()); } - pub fn add_current_request_block(&mut self, block: Arc>) { - // Cache the block. - self.current_parent_request.add_unknown_parent_block(block); - } - - pub fn add_current_request_blobs(&mut self, blobs: FixedBlobSidecarList) { - // Cache the blobs. - self.current_parent_request.add_unknown_parent_blobs(blobs); - } - pub fn processing_peer(&self) -> Result { self.current_parent_request .block_request_state @@ -234,8 +221,7 @@ impl ParentLookup { // be dropped and the peer downscored. if let Some(parent_root) = root_and_block .as_ref() - .map(|(_, block)| R::get_parent_root(block)) - .flatten() + .and_then(|(_, block)| R::get_parent_root(block)) { if failed_chains.contains(&parent_root) { request_state.register_failure_downloading(); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index e6e1e72e72..5499c66986 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,5 +1,4 @@ use super::{PeerShouldHave, ResponseType}; -use crate::sync::block_lookups::parent_lookup::RequestError::SendFailed; use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE; use crate::sync::block_lookups::{ BlockLookups, Id, LookupType, RootBlobsTuple, RootBlockTuple, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, @@ -9,7 +8,6 @@ use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::{get_block_root, BeaconChainTypes}; -use itertools::Itertools; use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; use rand::seq::IteratorRandom; @@ -95,9 +93,6 @@ pub trait RequestState { fn processing_peer(&self) -> Result { self.get_state().processing_peer() } - fn downloading_peer(&self) -> Result { - self.get_state().peer() - } fn set_component_processed(&mut self) { self.get_state_mut().component_processed = true; } @@ -185,7 +180,7 @@ pub trait RequestState { request_state.failed_processing >= request_state.failed_downloading } fn get_peer(&mut self) -> Result { - let mut request_state = self.get_state_mut(); + let request_state = self.get_state_mut(); let Some(peer_id) = request_state .available_peers .iter() @@ -441,7 +436,7 @@ impl BlobRequestState { requested_ids: <_>::default(), blob_download_queue: <_>::default(), state: SingleLookupRequestState::new(peer_source), - _phantom: PhantomData::default(), + _phantom: PhantomData, } } } @@ -457,7 +452,7 @@ impl BlockRequestState { Self { requested_block_root: block_root, state: SingleLookupRequestState::new(peers), - _phantom: PhantomData::default(), + _phantom: PhantomData, } } } @@ -585,10 +580,10 @@ impl SingleBlockLookup { unknown_parent_components: Option>, peers: &[PeerShouldHave], da_checker: Arc>, - cx: &SyncNetworkContext, + id: Id, ) -> Self { Self { - id: cx.next_id(), + id, block_request_state: BlockRequestState::new(requested_block_root, peers), blob_request_state: BlobRequestState::new(peers), da_checker, @@ -732,27 +727,6 @@ impl SingleBlockLookup { self.unknown_parent_components = Some(components); } } - pub fn add_unknown_parent_block(&mut self, block: Arc>) { - if let Some(ref mut components) = self.unknown_parent_components { - components.add_unknown_parent_block(block) - } else { - self.unknown_parent_components = Some(UnknownParentComponents { - downloaded_block: Some(block), - downloaded_blobs: FixedBlobSidecarList::default(), - }) - } - } - - pub fn add_unknown_parent_blobs(&mut self, blobs: FixedBlobSidecarList) { - if let Some(ref mut components) = self.unknown_parent_components { - components.add_unknown_parent_blobs(blobs) - } else { - self.unknown_parent_components = Some(UnknownParentComponents { - downloaded_block: None, - downloaded_blobs: blobs, - }) - } - } pub fn add_peers(&mut self, peers: &[PeerShouldHave]) { for peer in peers { @@ -886,14 +860,6 @@ impl SingleLookupRequestState { } } - pub fn peer(&self) -> Result { - match &self.state { - State::Processing { peer_id } => Ok(*peer_id), - State::Downloading { peer_id } => Ok(*peer_id), - _ => Err(()), - } - } - pub fn remove_peer_if_useless(&mut self, peer_id: &PeerId) { if !self.available_peers.is_empty() || self.potential_peers.len() > 1 { self.potential_peers.remove(peer_id); @@ -981,6 +947,26 @@ mod tests { } type T = Witness, E, MemoryStore, MemoryStore>; + struct TestLookup1; + + impl Lookup for TestLookup1 { + const MAX_ATTEMPTS: u8 = 3; + + fn lookup_type() -> LookupType { + panic!() + } + } + + struct TestLookup2; + + impl Lookup for TestLookup2 { + const MAX_ATTEMPTS: u8 = 4; + + fn lookup_type() -> LookupType { + panic!() + } + } + #[test] fn test_happy_path() { let peer_id = PeerShouldHave::BlockAndBlobs(PeerId::random()); @@ -998,15 +984,28 @@ mod tests { DataAvailabilityChecker::new(slot_clock, None, store.into(), spec) .expect("data availability checker"), ); - let mut sl = - SingleBlockLookup::<4, T>::new(block.canonical_root(), None, &[peer_id], da_checker); - sl.request_block().unwrap(); - sl.verify_block(Some(block.into())).unwrap().unwrap(); + let mut sl = SingleBlockLookup::::new( + block.canonical_root(), + None, + &[peer_id], + da_checker, + 1, + ); + as RequestState>::build_request( + &mut sl.block_request_state, + ) + .unwrap(); + as RequestState>::verify_response( + &mut sl.block_request_state, + block.canonical_root(), + Some(block.into()), + ) + .unwrap() + .unwrap(); } #[test] fn test_block_lookup_failures() { - const FAILURES: u8 = 3; let peer_id = PeerShouldHave::BlockAndBlobs(PeerId::random()); let block = rand_block(); let spec = E::default_spec(); @@ -1024,25 +1023,40 @@ mod tests { .expect("data availability checker"), ); - let mut sl = SingleBlockLookup::::new( + let mut sl = SingleBlockLookup::::new( block.canonical_root(), None, &[peer_id], da_checker, + 1, ); - for _ in 1..FAILURES { - sl.request_block().unwrap(); + for _ in 1..TestLookup2::MAX_ATTEMPTS { + as RequestState>::build_request( + &mut sl.block_request_state, + ) + .unwrap(); sl.block_request_state.state.register_failure_downloading(); } // Now we receive the block and send it for processing - sl.request_block().unwrap(); - sl.verify_block(Some(block.into())).unwrap().unwrap(); + as RequestState>::build_request( + &mut sl.block_request_state, + ) + .unwrap(); + as RequestState>::verify_response( + &mut sl.block_request_state, + block.canonical_root(), + Some(block.into()), + ) + .unwrap() + .unwrap(); // One processing failure maxes the available attempts sl.block_request_state.state.register_failure_processing(); assert_eq!( - sl.request_block(), + as RequestState>::build_request( + &mut sl.block_request_state + ), Err(LookupRequestError::TooManyAttempts { cannot_process: false }) diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index c12f101a3e..d7c9688e50 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -20,7 +20,8 @@ use tokio::sync::mpsc; use types::{ map_fork_name, map_fork_name_with, test_utils::{SeedableRng, TestRandom, XorShiftRng}, - BeaconBlock, EthSpec, ForkName, FullPayloadDeneb, MinimalEthSpec as E, SignedBeaconBlock, + BeaconBlock, BlobSidecar, EthSpec, ForkName, FullPayloadDeneb, MinimalEthSpec as E, + SignedBeaconBlock, }; type T = Witness, E, MemoryStore, MemoryStore>; @@ -304,7 +305,13 @@ fn test_single_block_lookup_happy_path() { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - bl.single_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); + bl.single_lookup_response::>( + id, + peer_id, + Some(block.into()), + D, + &cx, + ); rig.expect_empty_network(); rig.expect_block_process(response_type); @@ -313,11 +320,10 @@ fn test_single_block_lookup_happy_path() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. - bl.single_lookup_response(id, peer_id, None, D, &mut cx); - bl.single_block_component_processed( + bl.single_lookup_response::>(id, peer_id, None, D, &cx); + bl.single_block_component_processed::>( id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - response_type, &mut cx, ); rig.expect_empty_network(); @@ -346,7 +352,7 @@ fn test_single_block_lookup_empty_response() { } // The peer does not have the block. It should be penalized. - bl.single_lookup_response(id, peer_id, None, D, &mut cx); + bl.single_lookup_response::>(id, peer_id, None, D, &cx); rig.expect_penalty(); rig.expect_block_request(response_type); // it should be retried @@ -375,12 +381,18 @@ fn test_single_block_lookup_wrong_response() { // Peer sends something else. It should be penalized. let bad_block = rig.rand_block(fork_name); - bl.single_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + bl.single_lookup_response::>( + id, + peer_id, + Some(bad_block.into()), + D, + &cx, + ); rig.expect_penalty(); rig.expect_block_request(response_type); // should be retried // Send the stream termination. This should not produce an additional penalty. - bl.single_lookup_response(id, peer_id, None, D, &mut cx); + bl.single_lookup_response::>(id, peer_id, None, D, &cx); rig.expect_empty_network(); } @@ -406,7 +418,12 @@ fn test_single_block_lookup_failure() { } // The request fails. RPC failures are handled elsewhere so we should not penalize the peer. - bl.single_block_lookup_failed(id, &peer_id, &mut cx, RPCError::UnsupportedProtocol); + bl.single_block_lookup_failed::>( + id, + &peer_id, + &cx, + RPCError::UnsupportedProtocol, + ); rig.expect_block_request(response_type); rig.expect_empty_network(); } @@ -438,7 +455,13 @@ fn test_single_block_lookup_becomes_parent_request() { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - bl.single_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + bl.single_lookup_response::>( + id, + peer_id, + Some(block.clone()), + D, + &cx, + ); rig.expect_empty_network(); rig.expect_block_process(response_type); @@ -447,10 +470,9 @@ fn test_single_block_lookup_becomes_parent_request() { // Send the stream termination. Peer should have not been penalized, and the request moved to a // parent request after processing. - bl.single_block_component_processed( + bl.single_block_component_processed::>( id, BlockError::ParentUnknown(block.into()).into(), - response_type, &mut cx, ); assert_eq!(bl.single_block_lookups.len(), 1); @@ -491,22 +513,23 @@ fn test_parent_lookup_happy_path() { } // Peer sends the right block, it should be sent for processing. Peer should not be penalized. - bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); + bl.parent_lookup_response::>( + id, + peer_id, + Some(parent.into()), + D, + &cx, + ); rig.expect_block_process(response_type); rig.expect_empty_network(); // Processing succeeds, now the rest of the chain should be sent for processing. - bl.parent_block_processed( - chain_hash, - BlockError::BlockIsAlreadyKnown.into(), - response_type, - &mut cx, - ); + bl.parent_block_processed(chain_hash, BlockError::BlockIsAlreadyKnown.into(), &mut cx); rig.expect_parent_chain_process(); let process_result = BatchProcessResult::Success { was_non_empty: true, }; - bl.parent_chain_processed(chain_hash, process_result, &mut cx); + bl.parent_chain_processed(chain_hash, process_result, &cx); assert_eq!(bl.parent_lookups.len(), 0); } @@ -538,30 +561,41 @@ fn test_parent_lookup_wrong_response() { // Peer sends the wrong block, peer should be penalized and the block re-requested. let bad_block = rig.rand_block(fork_name); - bl.parent_lookup_response(id1, peer_id, Some(bad_block.into()), D, &mut cx); + bl.parent_lookup_response::>( + id1, + peer_id, + Some(bad_block.into()), + D, + &cx, + ); rig.expect_penalty(); let id2 = rig.expect_parent_request(response_type); // Send the stream termination for the first request. This should not produce extra penalties. - bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); + bl.parent_lookup_response::>(id1, peer_id, None, D, &cx); rig.expect_empty_network(); // Send the right block this time. - bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); + bl.parent_lookup_response::>( + id2, + peer_id, + Some(parent.into()), + D, + &cx, + ); rig.expect_block_process(response_type); // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed( chain_hash, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - response_type, &mut cx, ); rig.expect_parent_chain_process(); let process_result = BatchProcessResult::Success { was_non_empty: true, }; - bl.parent_chain_processed(chain_hash, process_result, &mut cx); + bl.parent_chain_processed(chain_hash, process_result, &cx); assert_eq!(bl.parent_lookups.len(), 0); } @@ -592,26 +626,31 @@ fn test_parent_lookup_empty_response() { } // Peer sends an empty response, peer should be penalized and the block re-requested. - bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); + bl.parent_lookup_response::>(id1, peer_id, None, D, &cx); rig.expect_penalty(); let id2 = rig.expect_parent_request(response_type); // Send the right block this time. - bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); + bl.parent_lookup_response::>( + id2, + peer_id, + Some(parent.into()), + D, + &cx, + ); rig.expect_block_process(response_type); // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed( chain_hash, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - response_type, &mut cx, ); rig.expect_parent_chain_process(); let process_result = BatchProcessResult::Success { was_non_empty: true, }; - bl.parent_chain_processed(chain_hash, process_result, &mut cx); + bl.parent_chain_processed(chain_hash, process_result, &cx); assert_eq!(bl.parent_lookups.len(), 0); } @@ -642,10 +681,10 @@ fn test_parent_lookup_rpc_failure() { } // The request fails. It should be tried again. - bl.parent_lookup_failed( + bl.parent_lookup_failed::>( id1, peer_id, - &mut cx, + &cx, RPCError::ErrorResponse( RPCResponseErrorCode::ResourceUnavailable, "older than deneb".into(), @@ -654,21 +693,26 @@ fn test_parent_lookup_rpc_failure() { let id2 = rig.expect_parent_request(response_type); // Send the right block this time. - bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); + bl.parent_lookup_response::>( + id2, + peer_id, + Some(parent.into()), + D, + &cx, + ); rig.expect_block_process(response_type); // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed( chain_hash, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - response_type, &mut cx, ); rig.expect_parent_chain_process(); let process_result = BatchProcessResult::Success { was_non_empty: true, }; - bl.parent_chain_processed(chain_hash, process_result, &mut cx); + bl.parent_chain_processed(chain_hash, process_result, &cx); assert_eq!(bl.parent_lookups.len(), 0); } @@ -701,10 +745,10 @@ fn test_parent_lookup_too_many_attempts() { // make sure every error is accounted for 0 => { // The request fails. It should be tried again. - bl.parent_lookup_failed( + bl.parent_lookup_failed::>( id, peer_id, - &mut cx, + &cx, RPCError::ErrorResponse( RPCResponseErrorCode::ResourceUnavailable, "older than deneb".into(), @@ -714,9 +758,15 @@ fn test_parent_lookup_too_many_attempts() { _ => { // Send a bad block this time. It should be tried again. let bad_block = rig.rand_block(fork_name); - bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + bl.parent_lookup_response::>( + id, + peer_id, + Some(bad_block.into()), + D, + &cx, + ); // Send the stream termination - bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + bl.parent_lookup_response::>(id, peer_id, None, D, &cx); rig.expect_penalty(); } } @@ -764,10 +814,10 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() { } if i % 2 != 0 { // The request fails. It should be tried again. - bl.parent_lookup_failed( + bl.parent_lookup_failed::>( id, peer_id, - &mut cx, + &cx, RPCError::ErrorResponse( RPCResponseErrorCode::ResourceUnavailable, "older than deneb".into(), @@ -776,7 +826,13 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() { } else { // Send a bad block this time. It should be tried again. let bad_block = rig.rand_block(fork_name); - bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + bl.parent_lookup_response::>( + id, + peer_id, + Some(bad_block.into()), + D, + &cx, + ); rig.expect_penalty(); } if i < parent_lookup::PARENT_FAIL_TOLERANCE { @@ -825,10 +881,10 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { let _ = rig.expect_parent_request(ResponseType::Blob); } // The request fails. It should be tried again. - bl.parent_lookup_failed( + bl.parent_lookup_failed::>( id, peer_id, - &mut cx, + &cx, RPCError::ErrorResponse( RPCResponseErrorCode::ResourceUnavailable, "older than deneb".into(), @@ -846,14 +902,15 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { // we don't require a response because we're generateing 0-blob blocks in this test. assert!(!bl.failed_chains.contains(&block_root)); // send the right parent but fail processing - bl.parent_lookup_response(id, peer_id, Some(parent.clone()), D, &mut cx); - bl.parent_block_processed( - block_root, - BlockError::InvalidSignature.into(), - response_type, - &mut cx, + bl.parent_lookup_response::>( + id, + peer_id, + Some(parent.clone()), + D, + &cx, ); - bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + bl.parent_block_processed(block_root, BlockError::InvalidSignature.into(), &mut cx); + bl.parent_lookup_response::>(id, peer_id, None, D, &cx); rig.expect_penalty(); } @@ -902,16 +959,21 @@ fn test_parent_lookup_too_deep() { let _ = rig.expect_parent_request(ResponseType::Blob); } // the block - bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + bl.parent_lookup_response::>( + id, + peer_id, + Some(block.clone()), + D, + &cx, + ); // the stream termination - bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + bl.parent_lookup_response::>(id, peer_id, None, D, &cx); // the processing request rig.expect_block_process(response_type); // the processing result bl.parent_block_processed( chain_hash, BlockError::ParentUnknown(block.into()).into(), - response_type, &mut cx, ) } @@ -971,7 +1033,13 @@ fn test_single_block_lookup_ignored_response() { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - bl.single_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); + bl.single_lookup_response::>( + id, + peer_id, + Some(block.into()), + D, + &cx, + ); rig.expect_empty_network(); rig.expect_block_process(response_type); @@ -980,9 +1048,13 @@ fn test_single_block_lookup_ignored_response() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. - bl.single_lookup_response(id, peer_id, None, D, &mut cx); + bl.single_lookup_response::>(id, peer_id, None, D, &cx); // Send an Ignored response, the request should be dropped - bl.single_block_component_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx); + bl.single_block_component_processed::>( + id, + BlockProcessingResult::Ignored, + &mut cx, + ); rig.expect_empty_network(); assert_eq!(bl.single_block_lookups.len(), 0); } @@ -1015,17 +1087,18 @@ fn test_parent_lookup_ignored_response() { } // Peer sends the right block, it should be sent for processing. Peer should not be penalized. - bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); + bl.parent_lookup_response::>( + id, + peer_id, + Some(parent.into()), + D, + &cx, + ); rig.expect_block_process(response_type); rig.expect_empty_network(); // Return an Ignored result. The request should be dropped - bl.parent_block_processed( - chain_hash, - BlockProcessingResult::Ignored, - response_type, - &mut cx, - ); + bl.parent_block_processed(chain_hash, BlockProcessingResult::Ignored, &mut cx); rig.expect_empty_network(); assert_eq!(bl.parent_lookups.len(), 0); } @@ -1092,25 +1165,25 @@ fn test_same_chain_race_condition() { let _ = rig.expect_parent_request(ResponseType::Blob); } // the block - bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + bl.parent_lookup_response::>( + id, + peer_id, + Some(block.clone()), + D, + &cx, + ); // the stream termination - bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + bl.parent_lookup_response::>(id, peer_id, None, D, &cx); // the processing request rig.expect_block_process(response_type); // the processing result if i + 2 == depth { // one block was removed - bl.parent_block_processed( - chain_hash, - BlockError::BlockIsAlreadyKnown.into(), - response_type, - &mut cx, - ) + bl.parent_block_processed(chain_hash, BlockError::BlockIsAlreadyKnown.into(), &mut cx) } else { bl.parent_block_processed( chain_hash, BlockError::ParentUnknown(block.into()).into(), - response_type, &mut cx, ) } @@ -1137,7 +1210,7 @@ fn test_same_chain_race_condition() { let process_result = BatchProcessResult::Success { was_non_empty: true, }; - bl.parent_chain_processed(chain_hash, process_result, &mut cx); + bl.parent_chain_processed(chain_hash, process_result, &cx); assert_eq!(bl.parent_lookups.len(), 0); } @@ -1307,12 +1380,12 @@ mod deneb_only { fn parent_block_response(mut self) -> Self { self.rig.expect_empty_network(); - self.bl.parent_lookup_response( + self.bl.parent_lookup_response::>( self.parent_block_req_id.expect("parent request id"), self.peer_id, self.parent_block.clone(), D, - &mut self.cx, + &self.cx, ); assert_eq!(self.bl.parent_lookups.len(), 1); @@ -1321,22 +1394,24 @@ mod deneb_only { fn parent_blob_response(mut self) -> Self { for blob in &self.parent_blobs { - self.bl.parent_lookup_blob_response( - self.parent_blob_req_id.expect("parent blob request id"), - self.peer_id, - Some(blob.clone()), - D, - &mut self.cx, - ); + self.bl + .parent_lookup_response::>( + self.parent_blob_req_id.expect("parent blob request id"), + self.peer_id, + Some(blob.clone()), + D, + &self.cx, + ); assert_eq!(self.bl.parent_lookups.len(), 1); } - self.bl.parent_lookup_blob_response( - self.parent_blob_req_id.expect("blob request id"), - self.peer_id, - None, - D, - &mut self.cx, - ); + self.bl + .parent_lookup_response::>( + self.parent_blob_req_id.expect("blob request id"), + self.peer_id, + None, + D, + &self.cx, + ); self } @@ -1353,13 +1428,14 @@ mod deneb_only { fn block_response(mut self) -> Self { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - self.bl.single_lookup_response( - self.block_req_id.expect("block request id"), - self.peer_id, - self.block.clone(), - D, - &mut self.cx, - ); + self.bl + .single_lookup_response::>( + self.block_req_id.expect("block request id"), + self.peer_id, + self.block.clone(), + D, + &self.cx, + ); self.rig.expect_empty_network(); // The request should still be active. @@ -1369,22 +1445,24 @@ mod deneb_only { fn blobs_response(mut self) -> Self { for blob in &self.blobs { - self.bl.single_blob_lookup_response( - self.blob_req_id.expect("blob request id"), - self.peer_id, - Some(blob.clone()), - D, - &mut self.cx, - ); + self.bl + .single_lookup_response::>( + self.blob_req_id.expect("blob request id"), + self.peer_id, + Some(blob.clone()), + D, + &self.cx, + ); assert_eq!(self.bl.single_block_lookups.len(), 1); } - self.bl.single_blob_lookup_response( - self.blob_req_id.expect("blob request id"), - self.peer_id, - None, - D, - &mut self.cx, - ); + self.bl + .single_lookup_response::>( + self.blob_req_id.expect("blob request id"), + self.peer_id, + None, + D, + &self.cx, + ); self } @@ -1402,58 +1480,63 @@ mod deneb_only { } fn empty_block_response(mut self) -> Self { - self.bl.single_lookup_response( - self.block_req_id.expect("block request id"), - self.peer_id, - None, - D, - &mut self.cx, - ); + self.bl + .single_lookup_response::>( + self.block_req_id.expect("block request id"), + self.peer_id, + None, + D, + &self.cx, + ); self } fn empty_blobs_response(mut self) -> Self { - self.bl.single_blob_lookup_response( - self.blob_req_id.expect("blob request id"), - self.peer_id, - None, - D, - &mut self.cx, - ); + self.bl + .single_lookup_response::>( + self.blob_req_id.expect("blob request id"), + self.peer_id, + None, + D, + &self.cx, + ); self } fn empty_parent_block_response(mut self) -> Self { - self.bl.parent_lookup_response( + self.bl.parent_lookup_response::>( self.parent_block_req_id.expect("block request id"), self.peer_id, None, D, - &mut self.cx, + &self.cx, ); self } fn empty_parent_blobs_response(mut self) -> Self { - self.bl.parent_lookup_blob_response( - self.parent_blob_req_id.expect("blob request id"), - self.peer_id, - None, - D, - &mut self.cx, - ); + self.bl + .parent_lookup_response::>( + self.parent_blob_req_id.expect("blob request id"), + self.peer_id, + None, + D, + &self.cx, + ); self } fn block_imported(mut self) -> Self { // Missing blobs should be the request is not removed, the outstanding blobs request should // mean we do not send a new request. - self.bl.single_block_component_processed( - self.block_req_id.expect("block request id"), - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), - ResponseType::Block, - &mut self.cx, - ); + self.bl + .single_block_component_processed::>( + self.block_req_id.expect("block request id"), + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported( + self.block_root, + )), + &mut self.cx, + ); self.rig.expect_empty_network(); assert_eq!(self.bl.single_block_lookups.len(), 0); self @@ -1463,7 +1546,6 @@ mod deneb_only { self.bl.parent_block_processed( self.block_root, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), - ResponseType::Block, &mut self.cx, ); self.rig.expect_empty_network(); @@ -1477,7 +1559,6 @@ mod deneb_only { BlockProcessingResult::Err(BlockError::ParentUnknown(RpcBlock::new_without_blobs( self.parent_block.clone().expect("parent block"), ))), - ResponseType::Block, &mut self.cx, ); assert_eq!(self.bl.parent_lookups.len(), 1); @@ -1488,7 +1569,6 @@ mod deneb_only { self.bl.parent_block_processed( self.block_root, BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), - ResponseType::Block, &mut self.cx, ); assert_eq!(self.bl.parent_lookups.len(), 1); @@ -1496,53 +1576,53 @@ mod deneb_only { } fn invalid_block_processed(mut self) -> Self { - self.bl.single_block_component_processed( - self.block_req_id.expect("block request id"), - BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), - ResponseType::Block, - &mut self.cx, - ); + self.bl + .single_block_component_processed::>( + self.block_req_id.expect("block request id"), + BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), + &mut self.cx, + ); assert_eq!(self.bl.single_block_lookups.len(), 1); self } fn invalid_blob_processed(mut self) -> Self { - self.bl.single_block_component_processed( - self.blob_req_id.expect("blob request id"), - BlockProcessingResult::Err(BlockError::BlobValidation( - BlobError::ProposerSignatureInvalid, - )), - ResponseType::Blob, - &mut self.cx, - ); + self.bl + .single_block_component_processed::>( + self.blob_req_id.expect("blob request id"), + BlockProcessingResult::Err(BlockError::BlobValidation( + BlobError::ProposerSignatureInvalid, + )), + &mut self.cx, + ); assert_eq!(self.bl.single_block_lookups.len(), 1); self } fn missing_components_from_block_request(mut self) -> Self { - self.bl.single_block_component_processed( - self.block_req_id.expect("block request id"), - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - self.slot, - self.block_root, - )), - ResponseType::Block, - &mut self.cx, - ); + self.bl + .single_block_component_processed::>( + self.block_req_id.expect("block request id"), + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + self.slot, + self.block_root, + )), + &mut self.cx, + ); assert_eq!(self.bl.single_block_lookups.len(), 1); self } fn missing_components_from_blob_request(mut self) -> Self { - self.bl.single_block_component_processed( - self.blob_req_id.expect("blob request id"), - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - self.slot, - self.block_root, - )), - ResponseType::Blob, - &mut self.cx, - ); + self.bl + .single_block_component_processed::>( + self.blob_req_id.expect("blob request id"), + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + self.slot, + self.block_root, + )), + &mut self.cx, + ); assert_eq!(self.bl.single_block_lookups.len(), 1); self } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 3106233658..cedad8890f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -44,7 +44,7 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::delayed_lookup; use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage; use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, Current, Parent, RequestState, UnknownParentComponents, + BlobRequestState, BlockRequestState, Current, Parent, UnknownParentComponents, }; use crate::sync::range_sync::ByRangeRequestType; use beacon_chain::block_verification_types::AsBlock; @@ -84,7 +84,7 @@ pub const DELAY_QUEUE_CHANNEL_SIZE: usize = 128; pub type Id = u32; -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub enum ResponseType { Block, Blob, @@ -187,7 +187,6 @@ pub enum SyncMessage { BlockComponentProcessed { process_type: BlockProcessType, result: BlockProcessingResult, - response_type: ResponseType, }, } @@ -350,7 +349,7 @@ impl SyncManager { .single_block_lookup_failed::>( id, &peer_id, - &mut self.network, + &self.network, error, ); } @@ -359,7 +358,7 @@ impl SyncManager { .single_block_lookup_failed::>( id, &peer_id, - &mut self.network, + &self.network, error, ); } @@ -368,7 +367,7 @@ impl SyncManager { .parent_lookup_failed::>( id, peer_id, - &mut self.network, + &self.network, error, ); } @@ -377,7 +376,7 @@ impl SyncManager { .parent_lookup_failed::>( id, peer_id, - &mut self.network, + &self.network, error, ); } @@ -701,7 +700,7 @@ impl SyncManager { self.block_lookups.search_block_delayed( block_root, PeerShouldHave::Neither(peer_id), - &self.network, + &mut self.network, ); if let Err(e) = self .delayed_lookups @@ -722,7 +721,7 @@ impl SyncManager { SyncMessage::MissingGossipBlockComponentsDelayed(block_root) => { if self .block_lookups - .trigger_lookup_by_root(block_root, &mut self.network) + .trigger_lookup_by_root(block_root, &self.network) .is_err() { // No request was made for block or blob so the lookup is dropped. @@ -740,25 +739,24 @@ impl SyncManager { SyncMessage::BlockComponentProcessed { process_type, result, - response_type, } => match process_type { BlockProcessType::SingleBlock { id } => self .block_lookups .single_block_component_processed::>( id, result, - &self.network, + &mut self.network, ), BlockProcessType::SingleBlob { id } => self .block_lookups .single_block_component_processed::>( id, result, - &self.network, + &mut self.network, ), BlockProcessType::ParentLookup { chain_hash } => self .block_lookups - .parent_block_processed(chain_hash, result, response_type, &mut self.network), + .parent_block_processed(chain_hash, result, &mut self.network), }, SyncMessage::BatchProcessed { sync_type, result } => match sync_type { ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { @@ -787,7 +785,7 @@ impl SyncManager { } ChainSegmentProcessId::ParentLookup(chain_hash) => self .block_lookups - .parent_chain_processed(chain_hash, result, &mut self.network), + .parent_chain_processed(chain_hash, result, &self.network), }, } } @@ -813,7 +811,7 @@ impl SyncManager { block_root, parent_components, &[PeerShouldHave::Neither(peer_id)], - &self.network, + &mut self.network, ); if let Err(e) = self .delayed_lookups @@ -953,7 +951,7 @@ impl SyncManager { seen_timestamp, &self.network, ), - RequestId::SingleBlob { id } => { + RequestId::SingleBlob { id: _ } => { crit!(self.log, "Blob received during block request"; "peer_id" => %peer_id ); } RequestId::ParentLookup { id } => self @@ -965,7 +963,7 @@ impl SyncManager { seen_timestamp, &self.network, ), - RequestId::ParentLookupBlob { id } => { + RequestId::ParentLookupBlob { id: _ } => { crit!(self.log, "Blob received during parent block request"; "peer_id" => %peer_id ); } RequestId::BackFillBlocks { id } => { @@ -1025,7 +1023,7 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => { + RequestId::SingleBlock { id: _ } => { crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); } RequestId::SingleBlob { id } => self @@ -1035,10 +1033,10 @@ impl SyncManager { peer_id, blob, seen_timestamp, - &mut self.network, + &self.network, ), - RequestId::ParentLookup { id } => { + RequestId::ParentLookup { id: _ } => { crit!(self.log, "Single blob received during parent block request"; "peer_id" => %peer_id ); } RequestId::ParentLookupBlob { id } => self @@ -1048,7 +1046,7 @@ impl SyncManager { peer_id, blob, seen_timestamp, - &mut self.network, + &self.network, ), RequestId::BackFillBlocks { id: _ } => { crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id ); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 21c25a1e5d..b7c6de2fc6 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -7,7 +7,7 @@ use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; -use crate::sync::block_lookups::{BlobRequestId, BlockRequestId}; +use crate::sync::block_lookups::LookupType; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; @@ -37,7 +37,7 @@ pub struct SyncNetworkContext { network_send: mpsc::UnboundedSender>, /// A sequential ID for all RPC requests. - request_id: std::cell::Cell, + request_id: Id, /// BlocksByRange requests made by the range syncing algorithm. range_requests: FnvHashMap, @@ -93,7 +93,7 @@ impl SyncNetworkContext { SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start - request_id: std::cell::Cell::new(1), + request_id: 1, range_requests: FnvHashMap::default(), backfill_requests: FnvHashMap::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), @@ -463,7 +463,8 @@ impl SyncNetworkContext { "Sending BlobsByRoot Request"; "method" => "BlobsByRoot", "count" => blob_request.blob_ids.len(), - "peer" => %blob_peer_id + "peer" => %blob_peer_id, + "lookup_type" => ?lookup_type ); self.send_network_msg(NetworkMessage::SendRequest { @@ -539,10 +540,10 @@ impl SyncNetworkContext { &self.network_beacon_processor } - pub fn next_id(&self) -> Id { - let current_value = self.request_id.get(); - self.request_id.set(current_value + 1); - current_value + pub(crate) fn next_id(&mut self) -> Id { + let id = self.request_id; + self.request_id += 1; + id } /// Check whether a batch for this epoch (and only this epoch) should request just blocks or