Notify lookup sync of gossip processing results (#5722)

* Notify lookup sync of gossip processing results

* Add tests

* Add GossipBlockProcessResult event

* Re-add dropped comments

* Update beacon_node/network/src/network_beacon_processor/sync_methods.rs

* update test_lookup_disconnection_peer_left
This commit is contained in:
Lion - dapplion
2024-05-13 14:41:29 +03:00
committed by GitHub
parent 6d792b4280
commit 93e0649abc
12 changed files with 398 additions and 93 deletions

View File

@@ -314,6 +314,26 @@ pub struct BlockImportData<E: EthSpec> {
pub consensus_context: ConsensusContext<E>, pub consensus_context: ConsensusContext<E>,
} }
impl<E: EthSpec> BlockImportData<E> {
pub fn __new_for_test(
block_root: Hash256,
state: BeaconState<E>,
parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
) -> Self {
Self {
block_root,
state,
parent_block,
parent_eth1_finalization_data: Eth1FinalizationData {
eth1_data: <_>::default(),
eth1_deposit_index: 0,
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
}
}
}
pub type GossipVerifiedBlockContents<E> = pub type GossipVerifiedBlockContents<E> =
(GossipVerifiedBlock<E>, Option<GossipVerifiedBlobList<E>>); (GossipVerifiedBlock<E>, Option<GossipVerifiedBlobList<E>>);

View File

@@ -84,10 +84,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}) })
} }
/// Checks if the block root is currenlty in the availability cache awaiting processing because /// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components. /// of missing components.
pub fn has_block(&self, block_root: &Hash256) -> bool { pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
self.availability_cache.has_block(block_root) self.availability_cache
.has_execution_valid_block(block_root)
} }
/// Return the required blobs `block_root` expects if the block is currenlty in the cache. /// Return the required blobs `block_root` expects if the block is currenlty in the cache.

View File

@@ -432,11 +432,6 @@ impl<T: BeaconChainTypes> Critical<T> {
Ok(()) Ok(())
} }
/// Returns true if the block root is known, without altering the LRU ordering
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.in_memory.peek(block_root).is_some() || self.store_keys.contains(block_root)
}
/// This only checks for the blobs in memory /// This only checks for the blobs in memory
pub fn peek_blob( pub fn peek_blob(
&self, &self,
@@ -549,8 +544,12 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
} }
/// Returns true if the block root is known, without altering the LRU ordering /// Returns true if the block root is known, without altering the LRU ordering
pub fn has_block(&self, block_root: &Hash256) -> bool { pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
self.critical.read().has_block(block_root) if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) {
pending_components.executed_block.is_some()
} else {
false
}
} }
/// Fetch a blob from the cache without affecting the LRU ordering /// Fetch a blob from the cache without affecting the LRU ordering

View File

@@ -1187,19 +1187,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => %block_root, "block_root" => %block_root,
); );
} }
Err(BlockError::ParentUnknown(block)) => { Err(BlockError::ParentUnknown(_)) => {
// Inform the sync manager to find parents for this block // This should not occur. It should be checked by `should_forward_block`.
// This should not occur. It should be checked by `should_forward_block` // Do not send sync message UnknownParentBlock to prevent conflicts with the
// BlockComponentProcessed message below. If this error ever happens, lookup sync
// can recover by receiving another block / blob / attestation referencing the
// chain that includes this block.
error!( error!(
self.log, self.log,
"Block with unknown parent attempted to be processed"; "Block with unknown parent attempted to be processed";
"block_root" => %block_root,
"peer_id" => %peer_id "peer_id" => %peer_id
); );
self.send_sync_message(SyncMessage::UnknownParentBlock(
peer_id,
block.clone(),
block_root,
));
} }
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!( debug!(
@@ -1263,6 +1262,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self.log, &self.log,
); );
} }
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))),
});
} }
pub fn process_gossip_voluntary_exit( pub fn process_gossip_voluntary_exit(

View File

@@ -1,7 +1,5 @@
use crate::{ use crate::sync::manager::BlockProcessType;
service::NetworkMessage, use crate::{service::NetworkMessage, sync::manager::SyncMessage};
sync::{manager::BlockProcessType, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain}; use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer};

View File

@@ -170,7 +170,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if reprocess_tx.try_send(reprocess_msg).is_err() { if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash) error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
}; };
if matches!(process_type, BlockProcessType::SingleBlock { .. }) {
self.chain.block_times_cache.write().set_time_observed( self.chain.block_times_cache.write().set_time_observed(
hash, hash,
slot, slot,
@@ -181,7 +180,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.chain.recompute_head_at_current_slot().await; self.chain.recompute_head_at_current_slot().await;
} }
}
// Sync handles these results // Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed { self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type, process_type,

View File

@@ -2,8 +2,8 @@ use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState, LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
}; };
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId}; use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE}; use crate::sync::manager::{Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
use std::sync::Arc; use std::sync::Arc;
@@ -45,7 +45,7 @@ pub trait RequestState<T: BeaconChainTypes> {
peer_id: PeerId, peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>, downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError>; ) -> Result<LookupRequestResult, LookupRequestError>;
/* Response handling methods */ /* Response handling methods */
@@ -80,7 +80,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: PeerId, peer_id: PeerId,
_: Option<usize>, _: Option<usize>,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> { ) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root) cx.block_lookup_request(id, peer_id, self.requested_block_root)
.map_err(LookupRequestError::SendFailed) .map_err(LookupRequestError::SendFailed)
} }
@@ -97,10 +97,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: _, peer_id: _,
} = download_result; } = download_result;
cx.send_block_for_processing( cx.send_block_for_processing(
id,
block_root, block_root,
RpcBlock::new_without_blobs(Some(block_root), value), RpcBlock::new_without_blobs(Some(block_root), value),
seen_timestamp, seen_timestamp,
BlockProcessType::SingleBlock { id },
) )
.map_err(LookupRequestError::SendFailed) .map_err(LookupRequestError::SendFailed)
} }
@@ -128,7 +128,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
peer_id: PeerId, peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>, downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> { ) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request( cx.blob_lookup_request(
id, id,
peer_id, peer_id,
@@ -149,12 +149,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
seen_timestamp, seen_timestamp,
peer_id: _, peer_id: _,
} = download_result; } = download_result;
cx.send_blobs_for_processing( cx.send_blobs_for_processing(id, block_root, value, seen_timestamp)
block_root,
value,
seen_timestamp,
BlockProcessType::SingleBlob { id },
)
.map_err(LookupRequestError::SendFailed) .map_err(LookupRequestError::SendFailed)
} }

View File

@@ -408,7 +408,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx) self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
} }
}; };
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); let id = match process_type {
BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => id,
};
self.on_lookup_result(id, lookup_result, "processing_result", cx);
} }
pub fn on_processing_result_inner<R: RequestState<T>>( pub fn on_processing_result_inner<R: RequestState<T>>(
@@ -521,6 +524,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
other => { other => {
debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other); debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other);
let peer_id = request_state.on_processing_failure()?; let peer_id = request_state.on_processing_failure()?;
cx.report_peer( cx.report_peer(
peer_id, peer_id,
@@ -561,6 +565,30 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
} }
pub fn on_external_processing_result(
&mut self,
block_root: Hash256,
imported: bool,
cx: &mut SyncNetworkContext<T>,
) {
let Some((id, lookup)) = self
.single_block_lookups
.iter_mut()
.find(|(_, lookup)| lookup.is_for_block(block_root))
else {
// Ok to ignore gossip process events
return;
};
let lookup_result = if imported {
Ok(LookupResult::Completed)
} else {
lookup.continue_requests(cx)
};
let id = *id;
self.on_lookup_result(id, lookup_result, "external_processing_result", cx);
}
/// Makes progress on the immediate children of `block_root` /// Makes progress on the immediate children of `block_root`
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) { pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self

View File

@@ -2,7 +2,7 @@ use super::common::ResponseType;
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id; use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
use itertools::Itertools; use itertools::Itertools;
use rand::seq::IteratorRandom; use rand::seq::IteratorRandom;
@@ -179,11 +179,13 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.use_rand_available_peer() .use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?; .ok_or(LookupRequestError::NoPeers)?;
// make_request returns true only if a request needs to be made match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { LookupRequestResult::RequestSent => request.get_state_mut().on_download_start()?,
request.get_state_mut().on_download_start()?; LookupRequestResult::NoRequestNeeded => {
} else { request.get_state_mut().on_completed_request()?
request.get_state_mut().on_completed_request()?; }
// Sync will receive a future event to make progress on the request, do nothing now
LookupRequestResult::Pending => return Ok(()),
} }
// Otherwise, attempt to progress awaiting processing // Otherwise, attempt to progress awaiting processing
@@ -262,12 +264,16 @@ pub struct DownloadResult<T: Clone> {
pub peer_id: PeerId, pub peer_id: PeerId,
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum State<T: Clone> { pub enum State<T: Clone> {
AwaitingDownload, AwaitingDownload,
Downloading, Downloading,
AwaitingProcess(DownloadResult<T>), AwaitingProcess(DownloadResult<T>),
/// Request is processing, sent by lookup sync
Processing(DownloadResult<T>), Processing(DownloadResult<T>),
/// Request is processed:
/// - `Processed(Some)` if lookup sync downloaded and sent to process this request
/// - `Processed(None)` if another source (i.e. gossip) sent this component for processing
Processed(Option<PeerId>), Processed(Option<PeerId>),
} }
@@ -428,12 +434,11 @@ impl<T: Clone> SingleLookupRequestState<T> {
} }
} }
pub fn on_processing_success(&mut self) -> Result<PeerId, LookupRequestError> { pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> {
match &self.state { match &self.state {
State::Processing(result) => { State::Processing(result) => {
let peer_id = result.peer_id; self.state = State::Processed(Some(result.peer_id));
self.state = State::Processed(Some(peer_id)); Ok(())
Ok(peer_id)
} }
other => Err(LookupRequestError::BadState(format!( other => Err(LookupRequestError::BadState(format!(
"Bad state on_processing_success expected Processing got {other}" "Bad state on_processing_success expected Processing got {other}"
@@ -514,12 +519,6 @@ impl<T: Clone> SingleLookupRequestState<T> {
impl<T: Clone> std::fmt::Display for State<T> { impl<T: Clone> std::fmt::Display for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { write!(f, "{}", Into::<&'static str>::into(self))
State::AwaitingDownload => write!(f, "AwaitingDownload"),
State::Downloading { .. } => write!(f, "Downloading"),
State::AwaitingProcess { .. } => write!(f, "AwaitingProcessing"),
State::Processing { .. } => write!(f, "Processing"),
State::Processed { .. } => write!(f, "Processed"),
}
} }
} }

View File

@@ -11,12 +11,17 @@ use std::sync::Arc;
use super::*; use super::*;
use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE};
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::blob_verification::GossipVerifiedBlob;
use beacon_chain::block_verification_types::{BlockImportData, RpcBlock};
use beacon_chain::builder::Witness; use beacon_chain::builder::Witness;
use beacon_chain::data_availability_checker::Availability;
use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::test_utils::{ use beacon_chain::test_utils::{
build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs, build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs,
}; };
use beacon_chain::{
AvailabilityPendingExecutedBlock, PayloadVerificationOutcome, PayloadVerificationStatus,
};
use beacon_processor::WorkEvent; use beacon_processor::WorkEvent;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::types::SyncState; use lighthouse_network::types::SyncState;
@@ -25,10 +30,12 @@ use slog::info;
use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock}; use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock};
use store::MemoryStore; use store::MemoryStore;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::test_utils::TestRandom;
use types::{ use types::{
test_utils::{SeedableRng, XorShiftRng}, test_utils::{SeedableRng, XorShiftRng},
BlobSidecar, ForkName, MinimalEthSpec as E, SignedBeaconBlock, Slot, BlobSidecar, ForkName, MinimalEthSpec as E, SignedBeaconBlock, Slot,
}; };
use types::{BeaconState, BeaconStateBase};
type T = Witness<ManualSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>; type T = Witness<ManualSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>;
@@ -68,6 +75,8 @@ struct TestRig {
sync_manager: SyncManager<T>, sync_manager: SyncManager<T>,
/// To manipulate sync state and peer connection status /// To manipulate sync state and peer connection status
network_globals: Arc<NetworkGlobals<E>>, network_globals: Arc<NetworkGlobals<E>>,
/// Beacon chain harness
harness: BeaconChainHarness<EphemeralHarnessType<E>>,
/// `rng` for generating test blocks and blobs. /// `rng` for generating test blocks and blobs.
rng: XorShiftRng, rng: XorShiftRng,
fork_name: ForkName, fork_name: ForkName,
@@ -129,6 +138,7 @@ impl TestRig {
sync_recv, sync_recv,
log.clone(), log.clone(),
), ),
harness,
fork_name, fork_name,
log, log,
} }
@@ -423,6 +433,63 @@ impl TestRig {
}); });
} }
fn complete_single_lookup_blob_download(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
blobs: Vec<BlobSidecar<E>>,
) {
for blob in blobs {
self.single_lookup_blob_response(id, peer_id, Some(blob.into()));
}
self.single_lookup_blob_response(id, peer_id, None);
}
fn complete_single_lookup_blob_lookup_valid(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
blobs: Vec<BlobSidecar<E>>,
import: bool,
) {
let block_root = blobs.first().unwrap().block_root();
let block_slot = blobs.first().unwrap().slot();
self.complete_single_lookup_blob_download(id, peer_id, blobs);
self.expect_block_process(ResponseType::Blob);
self.single_blob_component_processed(
id.lookup_id,
if import {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root))
} else {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
block_slot, block_root,
))
},
);
}
fn complete_single_lookup_block_valid(&mut self, block: SignedBeaconBlock<E>, import: bool) {
let block_root = block.canonical_root();
let block_slot = block.slot();
let id = self.expect_block_lookup_request(block_root);
self.expect_empty_network();
let peer_id = self.new_connected_peer();
self.single_lookup_block_response(id, peer_id, Some(block.into()));
self.single_lookup_block_response(id, peer_id, None);
self.expect_block_process(ResponseType::Block);
let id = self.find_single_lookup_for(block_root);
self.single_block_component_processed(
id,
if import {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root))
} else {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
block_slot, block_root,
))
},
)
}
fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) { fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) {
self.send_sync_message(SyncMessage::RpcError { self.send_sync_message(SyncMessage::RpcError {
peer_id, peer_id,
@@ -714,6 +781,89 @@ impl TestRig {
)); ));
blocks blocks
} }
fn insert_block_to_da_checker(&mut self, block: Arc<SignedBeaconBlock<E>>) {
let state = BeaconState::Base(BeaconStateBase::random_for_test(&mut self.rng));
let parent_block = self.rand_block();
let import_data = BlockImportData::<E>::__new_for_test(
block.canonical_root(),
state,
parent_block.into(),
);
let payload_verification_outcome = PayloadVerificationOutcome {
payload_verification_status: PayloadVerificationStatus::Verified,
is_valid_merge_transition_block: false,
};
let executed_block =
AvailabilityPendingExecutedBlock::new(block, import_data, payload_verification_outcome);
match self
.harness
.chain
.data_availability_checker
.put_pending_executed_block(executed_block)
.unwrap()
{
Availability::Available(_) => panic!("block removed from da_checker, available"),
Availability::MissingComponents(block_root) => {
self.log(&format!("inserted block to da_checker {block_root:?}"))
}
};
}
fn insert_blob_to_da_checker(&mut self, blob: BlobSidecar<E>) {
match self
.harness
.chain
.data_availability_checker
.put_gossip_blob(GossipVerifiedBlob::__assumed_valid(blob.into()))
.unwrap()
{
Availability::Available(_) => panic!("blob removed from da_checker, available"),
Availability::MissingComponents(block_root) => {
self.log(&format!("inserted blob to da_checker {block_root:?}"))
}
};
}
fn insert_block_to_processing_cache(&mut self, block: Arc<SignedBeaconBlock<E>>) {
self.harness
.chain
.reqresp_pre_import_cache
.write()
.insert(block.canonical_root(), block);
}
fn simulate_block_gossip_processing_becomes_invalid(&mut self, block_root: Hash256) {
self.harness
.chain
.reqresp_pre_import_cache
.write()
.remove(&block_root);
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: false,
});
}
fn simulate_block_gossip_processing_becomes_valid_missing_components(
&mut self,
block: Arc<SignedBeaconBlock<E>>,
) {
let block_root = block.canonical_root();
self.harness
.chain
.reqresp_pre_import_cache
.write()
.remove(&block_root);
self.insert_block_to_da_checker(block);
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: false,
});
}
} }
#[test] #[test]
@@ -1111,17 +1261,17 @@ fn test_parent_lookup_disconnection_no_peers_left() {
} }
#[test] #[test]
fn test_parent_lookup_disconnection_peer_left() { fn test_lookup_disconnection_peer_left() {
let mut rig = TestRig::test_setup(); let mut rig = TestRig::test_setup();
let peer_ids = (0..2).map(|_| rig.new_connected_peer()).collect::<Vec<_>>(); let peer_ids = (0..2).map(|_| rig.new_connected_peer()).collect::<Vec<_>>();
let trigger_block = rig.rand_block(); let block_root = Hash256::random();
// lookup should have two peers associated with the same block // lookup should have two peers associated with the same block
for peer_id in peer_ids.iter() { for peer_id in peer_ids.iter() {
rig.trigger_unknown_parent_block(*peer_id, trigger_block.clone().into()); rig.trigger_unknown_block_from_attestation(block_root, *peer_id);
} }
// Disconnect the first peer only, which is the one handling the request // Disconnect the first peer only, which is the one handling the request
rig.peer_disconnected(*peer_ids.first().unwrap()); rig.peer_disconnected(*peer_ids.first().unwrap());
rig.assert_parent_lookups_count(1); rig.assert_single_lookups_count(1);
} }
#[test] #[test]
@@ -1254,6 +1404,87 @@ fn test_same_chain_race_condition() {
rig.expect_no_active_lookups(); rig.expect_no_active_lookups();
} }
#[test]
fn block_in_da_checker_skips_download() {
let Some(mut r) = TestRig::test_setup_after_deneb() else {
return;
};
let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1));
let block_root = block.canonical_root();
let peer_id = r.new_connected_peer();
r.insert_block_to_da_checker(block.into());
r.trigger_unknown_block_from_attestation(block_root, peer_id);
// Should not trigger block request
let id = r.expect_blob_lookup_request(block_root);
r.expect_empty_network();
// Resolve blob and expect lookup completed
r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true);
r.expect_no_active_lookups();
}
#[test]
fn block_in_processing_cache_becomes_invalid() {
let Some(mut r) = TestRig::test_setup_after_deneb() else {
return;
};
let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1));
let block_root = block.canonical_root();
let peer_id = r.new_connected_peer();
r.insert_block_to_processing_cache(block.clone().into());
r.trigger_unknown_block_from_attestation(block_root, peer_id);
// Should not trigger block request
let id = r.expect_blob_lookup_request(block_root);
r.expect_empty_network();
// Simulate invalid block, removing it from processing cache
r.simulate_block_gossip_processing_becomes_invalid(block_root);
// Should download and process the block
r.complete_single_lookup_block_valid(block, false);
// Resolve blob and expect lookup completed
r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true);
r.expect_no_active_lookups();
}
#[test]
fn block_in_processing_cache_becomes_valid_imported() {
let Some(mut r) = TestRig::test_setup_after_deneb() else {
return;
};
let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1));
let block_root = block.canonical_root();
let peer_id = r.new_connected_peer();
r.insert_block_to_processing_cache(block.clone().into());
r.trigger_unknown_block_from_attestation(block_root, peer_id);
// Should not trigger block request
let id = r.expect_blob_lookup_request(block_root);
r.expect_empty_network();
// Resolve the block from processing step
r.simulate_block_gossip_processing_becomes_valid_missing_components(block.into());
// Resolve blob and expect lookup completed
r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true);
r.expect_no_active_lookups();
}
// IGNORE: wait for change that delays blob fetching to knowing the block
#[ignore]
#[test]
fn blobs_in_da_checker_skip_download() {
let Some(mut r) = TestRig::test_setup_after_deneb() else {
return;
};
let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1));
let block_root = block.canonical_root();
let peer_id = r.new_connected_peer();
for blob in blobs {
r.insert_blob_to_da_checker(blob);
}
r.trigger_unknown_block_from_attestation(block_root, peer_id);
// Should download and process the block
r.complete_single_lookup_block_valid(block, true);
// Should not trigger blob request
r.expect_empty_network();
r.expect_no_active_lookups();
}
mod deneb_only { mod deneb_only {
use super::*; use super::*;
use beacon_chain::{ use beacon_chain::{

View File

@@ -144,6 +144,9 @@ pub enum SyncMessage<E: EthSpec> {
process_type: BlockProcessType, process_type: BlockProcessType,
result: BlockProcessingResult<E>, result: BlockProcessingResult<E>,
}, },
/// A block from gossip has completed processing,
GossipBlockProcessResult { block_root: Hash256, imported: bool },
} }
/// The type of processing specified for a received block. /// The type of processing specified for a received block.
@@ -153,14 +156,6 @@ pub enum BlockProcessType {
SingleBlob { id: Id }, SingleBlob { id: Id },
} }
impl BlockProcessType {
pub fn id(&self) -> Id {
match self {
BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => *id,
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum BlockProcessingResult<E: EthSpec> { pub enum BlockProcessingResult<E: EthSpec> {
Ok(AvailabilityProcessingStatus), Ok(AvailabilityProcessingStatus),
@@ -637,6 +632,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => self } => self
.block_lookups .block_lookups
.on_processing_result(process_type, result, &mut self.network), .on_processing_result(process_type, result, &mut self.network),
SyncMessage::GossipBlockProcessResult {
block_root,
imported,
} => self.block_lookups.on_external_processing_result(
block_root,
imported,
&mut self.network,
),
SyncMessage::BatchProcessed { sync_type, result } => match sync_type { SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
self.range_sync.handle_block_process_result( self.range_sync.handle_block_process_result(

View File

@@ -4,13 +4,13 @@
use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest};
pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest}; pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest};
use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use super::manager::{BlockProcessType, Id, RequestId as SyncRequestId}; use super::manager::{Id, RequestId as SyncRequestId};
use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId; use crate::sync::block_lookups::SingleLookupId;
use crate::sync::manager::SingleLookupReqId; use crate::sync::manager::{BlockProcessType, SingleLookupReqId};
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
@@ -81,6 +81,19 @@ impl From<LookupVerifyError> for LookupFailure {
} }
} }
pub enum LookupRequestResult {
/// A request is sent. Sync MUST receive an event from the network in the future for either:
/// completed response or failed request
RequestSent,
/// No request is sent, and no further action is necessary to consider this request completed
NoRequestNeeded,
/// No request is sent, but the request is not completed. Sync MUST receive some future event
/// that makes progress on the request. For example: request is processing from a different
/// source (i.e. block received from gossip) and sync MUST receive an event with that processing
/// result.
Pending,
}
/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
pub struct SyncNetworkContext<T: BeaconChainTypes> { pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// The network channel to relay messages to the Network service. /// The network channel to relay messages to the Network service.
@@ -305,14 +318,27 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
lookup_id: SingleLookupId, lookup_id: SingleLookupId,
peer_id: PeerId, peer_id: PeerId,
block_root: Hash256, block_root: Hash256,
) -> Result<bool, &'static str> { ) -> Result<LookupRequestResult, &'static str> {
// da_checker includes block that are execution verified, but are missing components
if self
.chain
.data_availability_checker
.has_execution_valid_block(&block_root)
{
return Ok(LookupRequestResult::NoRequestNeeded);
}
// reqresp_pre_import_cache includes blocks that may not be yet execution verified
if self if self
.chain .chain
.reqresp_pre_import_cache .reqresp_pre_import_cache
.read() .read()
.contains_key(&block_root) .contains_key(&block_root)
{ {
return Ok(false); // A block is on the `reqresp_pre_import_cache` but NOT in the
// `data_availability_checker` only if it is actively processing. We can expect a future
// event with the result of processing
return Ok(LookupRequestResult::Pending);
} }
let id = SingleLookupReqId { let id = SingleLookupReqId {
@@ -340,7 +366,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.blocks_by_root_requests self.blocks_by_root_requests
.insert(id, ActiveBlocksByRootRequest::new(request)); .insert(id, ActiveBlocksByRootRequest::new(request));
Ok(true) Ok(LookupRequestResult::RequestSent)
} }
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
@@ -355,7 +381,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId, peer_id: PeerId,
block_root: Hash256, block_root: Hash256,
downloaded_block_expected_blobs: Option<usize>, downloaded_block_expected_blobs: Option<usize>,
) -> Result<bool, &'static str> { ) -> Result<LookupRequestResult, &'static str> {
let expected_blobs = downloaded_block_expected_blobs let expected_blobs = downloaded_block_expected_blobs
.or_else(|| { .or_else(|| {
self.chain self.chain
@@ -387,7 +413,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
if indices.is_empty() { if indices.is_empty() {
// No blobs required, do not issue any request // No blobs required, do not issue any request
return Ok(false); return Ok(LookupRequestResult::NoRequestNeeded);
} }
let id = SingleLookupReqId { let id = SingleLookupReqId {
@@ -419,7 +445,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.blobs_by_root_requests self.blobs_by_root_requests
.insert(id, ActiveBlobsByRootRequest::new(request)); .insert(id, ActiveBlobsByRootRequest::new(request));
Ok(true) Ok(LookupRequestResult::RequestSent)
} }
pub fn is_execution_engine_online(&self) -> bool { pub fn is_execution_engine_online(&self) -> bool {
@@ -595,19 +621,19 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn send_block_for_processing( pub fn send_block_for_processing(
&self, &self,
id: Id,
block_root: Hash256, block_root: Hash256,
block: RpcBlock<T::EthSpec>, block: RpcBlock<T::EthSpec>,
duration: Duration, duration: Duration,
process_type: BlockProcessType,
) -> Result<(), &'static str> { ) -> Result<(), &'static str> {
match self.beacon_processor_if_enabled() { match self.beacon_processor_if_enabled() {
Some(beacon_processor) => { Some(beacon_processor) => {
debug!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type); debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id);
if let Err(e) = beacon_processor.send_rpc_beacon_block( if let Err(e) = beacon_processor.send_rpc_beacon_block(
block_root, block_root,
block, block,
duration, duration,
process_type, BlockProcessType::SingleBlock { id },
) { ) {
error!( error!(
self.log, self.log,
@@ -628,17 +654,20 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn send_blobs_for_processing( pub fn send_blobs_for_processing(
&self, &self,
id: Id,
block_root: Hash256, block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>, blobs: FixedBlobSidecarList<T::EthSpec>,
duration: Duration, duration: Duration,
process_type: BlockProcessType,
) -> Result<(), &'static str> { ) -> Result<(), &'static str> {
match self.beacon_processor_if_enabled() { match self.beacon_processor_if_enabled() {
Some(beacon_processor) => { Some(beacon_processor) => {
debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id);
if let Err(e) = if let Err(e) = beacon_processor.send_rpc_blobs(
beacon_processor.send_rpc_blobs(block_root, blobs, duration, process_type) block_root,
{ blobs,
duration,
BlockProcessType::SingleBlob { id },
) {
error!( error!(
self.log, self.log,
"Failed to send sync blobs to processor"; "Failed to send sync blobs to processor";