make block and blob single lookups generic

This commit is contained in:
realbigsean
2023-07-25 10:45:58 -04:00
parent 985bbc5a55
commit 8a6e8d51b6
7 changed files with 1114 additions and 1401 deletions

View File

@@ -482,7 +482,10 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
SyncId::SingleBlock { .. }
| SyncId::SingleBlob { .. }
| SyncId::ParentLookup { .. }
| SyncId::ParentLookupBlob { .. } => {
unreachable!("Block lookups do not request BBRange requests")
}
id @ (SyncId::BackFillBlocks { .. }
@@ -550,6 +553,9 @@ impl<T: BeaconChainTypes> Router<T> {
| SyncId::BackFillBlockAndBlobs { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}
SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. } => {
unreachable!("Blob response to block by roots request")
}
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};
@@ -576,7 +582,10 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
id @ (SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. }) => id,
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
unreachable!("Block response to blobs by roots request")
}
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlockAndBlobs { .. }

File diff suppressed because it is too large Load Diff

View File

@@ -1,18 +1,21 @@
use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup};
use super::{BlobRequestId, BlockRequestId, DownloadedBlocks, PeerShouldHave, ResponseType};
use crate::sync::block_lookups::single_block_lookup::{State, UnknownParentComponents};
use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple};
use super::{DownloadedBlocks, PeerShouldHave};
use crate::sync::block_lookups::single_block_lookup::{
Parent, RequestState, State, UnknownParentComponents,
};
use crate::sync::block_lookups::Lookup;
use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
use beacon_chain::BeaconChainTypes;
use itertools::Itertools;
use lighthouse_network::PeerId;
use std::sync::Arc;
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, SignedBeaconBlock};
use types::SignedBeaconBlock;
/// How many attempts we try to find a parent of a block before we give up trying.
pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;
@@ -28,7 +31,7 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
/// The blocks that have currently been downloaded.
downloaded_blocks: Vec<DownloadedBlocks<T::EthSpec>>,
/// Request of the last parent.
pub current_parent_request: SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>,
pub current_parent_request: SingleBlockLookup<Parent, T>,
}
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
@@ -55,6 +58,7 @@ pub enum RequestError {
cannot_process: bool,
},
NoPeers,
AlreadyDownloaded,
}
impl<T: BeaconChainTypes> ParentLookup<T> {
@@ -63,9 +67,15 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
parent_root: Hash256,
peer_id: PeerShouldHave,
da_checker: Arc<DataAvailabilityChecker<T>>,
cx: &SyncNetworkContext<T>,
) -> Self {
let current_parent_request =
SingleBlockLookup::new(parent_root, Some(<_>::default()), &[peer_id], da_checker);
let current_parent_request = SingleBlockLookup::new(
parent_root,
Some(<_>::default()),
&[peer_id],
da_checker,
cx,
);
Self {
chain_hash: block_root,
@@ -85,52 +95,15 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
}
/// Attempts to request the next unknown parent. If the request fails, it should be removed.
pub fn request_parent_block(
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), RequestError> {
pub fn request_parent(&mut self, cx: &SyncNetworkContext<T>) -> Result<(), RequestError> {
// check to make sure this request hasn't failed
if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE {
return Err(RequestError::ChainTooLong);
}
if let Some((peer_id, request)) = self.current_parent_request.request_block()? {
match cx.parent_lookup_block_request(peer_id, request) {
Ok(request_id) => {
self.current_parent_request.id.block_request_id = Some(request_id);
return Ok(());
}
Err(reason) => {
self.current_parent_request.id.block_request_id = None;
return Err(RequestError::SendFailed(reason));
}
}
}
Ok(())
}
pub fn request_parent_blobs(
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), RequestError> {
// check to make sure this request hasn't failed
if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE {
return Err(RequestError::ChainTooLong);
}
if let Some((peer_id, request)) = self.current_parent_request.request_blobs()? {
match cx.parent_lookup_blobs_request(peer_id, request) {
Ok(request_id) => {
self.current_parent_request.id.blob_request_id = Some(request_id);
return Ok(());
}
Err(reason) => {
self.current_parent_request.id.blob_request_id = None;
return Err(RequestError::SendFailed(reason));
}
}
}
Ok(())
self.current_parent_request
.request_block_and_blobs(cx)
.map_err(Into::into)
}
pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
@@ -162,11 +135,9 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.block_request_state
.requested_block_root = next_parent;
self.current_parent_request.block_request_state.state.state = State::AwaitingDownload;
self.current_parent_request.id.block_request_id = None;
// Update the blobs request.
self.current_parent_request.blob_request_state.state.state = State::AwaitingDownload;
self.current_parent_request.id.blob_request_id = None;
// Reset the unknown parent components.
self.current_parent_request.unknown_parent_components =
@@ -176,25 +147,24 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
pub fn add_current_request_block(&mut self, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
// Cache the block.
self.current_parent_request.add_unknown_parent_block(block);
// Update the request.
self.current_parent_request.id.block_request_id = None;
}
pub fn add_current_request_blobs(&mut self, blobs: FixedBlobSidecarList<T::EthSpec>) {
// Cache the blobs.
self.current_parent_request.add_unknown_parent_blobs(blobs);
// Update the request.
self.current_parent_request.id.blob_request_id = None;
}
pub fn pending_block_response(&self, req_id: BlockRequestId) -> bool {
self.current_parent_request.id.block_request_id == Some(req_id)
}
pub fn pending_blob_response(&self, req_id: BlobRequestId) -> bool {
self.current_parent_request.id.blob_request_id == Some(req_id)
pub fn processing_peer(&self) -> Result<PeerShouldHave, ()> {
self.current_parent_request
.block_request_state
.state
.processing_peer()
.or_else(|()| {
self.current_parent_request
.blob_request_state
.state
.processing_peer()
})
}
/// Consumes the parent request and destructures it into it's parts.
@@ -205,7 +175,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
Hash256,
Vec<RpcBlock<T::EthSpec>>,
Vec<Hash256>,
SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>,
SingleBlockLookup<Parent, T>,
) {
let ParentLookup {
chain_hash,
@@ -227,26 +197,14 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
self.chain_hash
}
pub fn block_download_failed(&mut self) {
pub fn processing_failed(&mut self) {
self.current_parent_request
.block_request_state
.state
.register_failure_downloading();
self.current_parent_request.id.block_request_id = None;
}
pub fn blob_download_failed(&mut self) {
.register_failure_processing();
self.current_parent_request
.blob_request_state
.state
.register_failure_downloading();
self.current_parent_request.id.blob_request_id = None;
}
pub fn block_processing_failed(&mut self) {
self.current_parent_request
.block_request_state
.state
.register_failure_processing();
if let Some(components) = self
.current_parent_request
@@ -254,46 +212,33 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.as_mut()
{
components.downloaded_block = None;
}
self.current_parent_request.id.block_request_id = None;
}
pub fn blob_processing_failed(&mut self) {
self.current_parent_request
.blob_request_state
.state
.register_failure_processing();
if let Some(components) = self
.current_parent_request
.unknown_parent_components
.as_mut()
{
components.downloaded_blobs = <_>::default();
}
self.current_parent_request.id.blob_request_id = None;
}
/// Verifies that the received block is what we requested. If so, parent lookup now waits for
/// the processing result of the block.
pub fn verify_block(
pub fn verify_block<R: RequestState<Parent, T>>(
&mut self,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
block: Option<R::ResponseType>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<RootBlockTuple<T::EthSpec>>, ParentVerifyError> {
let root_and_block = self.current_parent_request.verify_block(block)?;
) -> Result<Option<(Hash256, R::VerifiedResponseType)>, ParentVerifyError> {
let expected_block_root = self
.current_parent_request
.block_request_state
.requested_block_root;
let request_state = R::request_state_mut(&mut self.current_parent_request);
let root_and_block = request_state.verify_response(expected_block_root, block)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should
// be dropped and the peer downscored.
if let Some(parent_root) = root_and_block
.as_ref()
.map(|(_, block)| block.parent_root())
.map(|(_, block)| R::get_parent_root(block))
.flatten()
{
if failed_chains.contains(&parent_root) {
self.current_parent_request
.block_request_state
.state
.register_failure_downloading();
self.current_parent_request.id.block_request_id = None;
request_state.register_failure_downloading();
return Err(ParentVerifyError::PreviousFailure { parent_root });
}
}
@@ -301,49 +246,24 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
Ok(root_and_block)
}
pub fn verify_blob(
&mut self,
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<RootBlobsTuple<T::EthSpec>>, ParentVerifyError> {
let parent_root_opt = blob.as_ref().map(|b| b.block_parent_root);
let blobs = self.current_parent_request.verify_blob(blob)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should
// be dropped and the peer downscored.
if let Some(parent_root) = parent_root_opt {
if failed_chains.contains(&parent_root) {
self.current_parent_request
.blob_request_state
.state
.register_failure_downloading();
self.current_parent_request.id.blob_request_id = None;
return Err(ParentVerifyError::PreviousFailure { parent_root });
}
}
Ok(blobs)
}
pub fn add_peers(&mut self, peer_source: &[PeerShouldHave]) {
self.current_parent_request.add_peers(peer_source)
}
pub fn used_peers(&self, response_type: ResponseType) -> impl Iterator<Item = &PeerId> + '_ {
match response_type {
ResponseType::Block => self
.current_parent_request
.block_request_state
.state
.used_peers
.iter(),
ResponseType::Blob => self
.current_parent_request
.blob_request_state
.state
.used_peers
.iter(),
}
pub fn used_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.current_parent_request
.block_request_state
.state
.used_peers
.iter()
.chain(
self.current_parent_request
.blob_request_state
.state
.used_peers
.iter(),
)
.unique()
}
}
@@ -371,6 +291,8 @@ impl From<LookupRequestError> for RequestError {
RequestError::TooManyAttempts { cannot_process }
}
E::NoPeers => RequestError::NoPeers,
E::AlreadyDownloaded => RequestError::AlreadyDownloaded,
E::SendFailed(msg) => RequestError::SendFailed(msg),
}
}
}
@@ -398,6 +320,7 @@ impl RequestError {
}
RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts",
RequestError::NoPeers => "no_peers",
RequestError::AlreadyDownloaded => "already_downloaded",
}
}
}

View File

@@ -304,7 +304,7 @@ fn test_single_block_lookup_happy_path() {
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx);
bl.single_lookup_response(id, peer_id, Some(block.into()), D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(response_type);
@@ -313,7 +313,7 @@ fn test_single_block_lookup_happy_path() {
// Send the stream termination. Peer should have not been penalized, and the request removed
// after processing.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
bl.single_lookup_response(id, peer_id, None, D, &mut cx);
bl.single_block_component_processed(
id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
@@ -346,7 +346,7 @@ fn test_single_block_lookup_empty_response() {
}
// The peer does not have the block. It should be penalized.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
bl.single_lookup_response(id, peer_id, None, D, &mut cx);
rig.expect_penalty();
rig.expect_block_request(response_type); // it should be retried
@@ -375,12 +375,12 @@ fn test_single_block_lookup_wrong_response() {
// Peer sends something else. It should be penalized.
let bad_block = rig.rand_block(fork_name);
bl.single_block_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx);
bl.single_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx);
rig.expect_penalty();
rig.expect_block_request(response_type); // should be retried
// Send the stream termination. This should not produce an additional penalty.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
bl.single_lookup_response(id, peer_id, None, D, &mut cx);
rig.expect_empty_network();
}
@@ -438,7 +438,7 @@ fn test_single_block_lookup_becomes_parent_request() {
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
bl.single_block_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx);
bl.single_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(response_type);
@@ -971,7 +971,7 @@ fn test_single_block_lookup_ignored_response() {
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx);
bl.single_lookup_response(id, peer_id, Some(block.into()), D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(response_type);
@@ -980,7 +980,7 @@ fn test_single_block_lookup_ignored_response() {
// Send the stream termination. Peer should have not been penalized, and the request removed
// after processing.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
bl.single_lookup_response(id, peer_id, None, D, &mut cx);
// Send an Ignored response, the request should be dropped
bl.single_block_component_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx);
rig.expect_empty_network();
@@ -1353,7 +1353,7 @@ mod deneb_only {
fn block_response(mut self) -> Self {
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
self.bl.single_block_lookup_response(
self.bl.single_lookup_response(
self.block_req_id.expect("block request id"),
self.peer_id,
self.block.clone(),
@@ -1402,7 +1402,7 @@ mod deneb_only {
}
fn empty_block_response(mut self) -> Self {
self.bl.single_block_lookup_response(
self.bl.single_lookup_response(
self.block_req_id.expect("block request id"),
self.peer_id,
None,

View File

@@ -43,8 +43,9 @@ use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::delayed_lookup;
use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage;
pub use crate::sync::block_lookups::ResponseType;
use crate::sync::block_lookups::UnknownParentComponents;
use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, Current, Parent, RequestState, UnknownParentComponents,
};
use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::block_verification_types::RpcBlock;
@@ -83,27 +84,47 @@ pub const DELAY_QUEUE_CHANNEL_SIZE: usize = 128;
pub type Id = u32;
#[derive(Debug)]
pub enum ResponseType {
Block,
Blob,
}
/// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RequestId {
/// Request searching for a block given a hash.
SingleBlock { id: Id },
SingleBlock {
id: Id,
},
SingleBlob {
id: Id,
},
/// Request searching for a block's parent. The id is the chain
ParentLookup { id: Id },
ParentLookup {
id: Id,
},
ParentLookupBlob {
id: Id,
},
/// Request was from the backfill sync algorithm.
BackFillBlocks { id: Id },
BackFillBlocks {
id: Id,
},
/// Backfill request that is composed by both a block range request and a blob range request.
BackFillBlockAndBlobs { id: Id },
BackFillBlockAndBlobs {
id: Id,
},
/// The request was from a chain in the range sync algorithm.
RangeBlocks { id: Id },
RangeBlocks {
id: Id,
},
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
RangeBlockAndBlobs {
id: Id,
},
}
// TODO(diva) I'm updating functions what at a time, but this should be revisited because I think
// some code paths that are split for blobs and blocks can be made just one after sync as a whole
// is updated.
#[derive(Debug)]
/// A message that can be sent to the sync manager thread.
pub enum SyncMessage<T: EthSpec> {
@@ -174,6 +195,7 @@ pub enum SyncMessage<T: EthSpec> {
#[derive(Debug, Clone)]
pub enum BlockProcessType {
SingleBlock { id: Id },
SingleBlob { id: Id },
ParentLookup { chain_hash: Hash256 },
}
@@ -324,16 +346,40 @@ impl<T: BeaconChainTypes> SyncManager<T> {
trace!(self.log, "Sync manager received a failed RPC");
match request_id {
RequestId::SingleBlock { id } => {
self.block_lookups.single_block_lookup_failed(
id,
&peer_id,
&mut self.network,
error,
);
self.block_lookups
.single_block_lookup_failed::<BlockRequestState<Current>>(
id,
&peer_id,
&mut self.network,
error,
);
}
RequestId::SingleBlob { id } => {
self.block_lookups
.single_block_lookup_failed::<BlobRequestState<Current, T::EthSpec>>(
id,
&peer_id,
&mut self.network,
error,
);
}
RequestId::ParentLookup { id } => {
self.block_lookups
.parent_lookup_failed(id, peer_id, &mut self.network, error);
.parent_lookup_failed::<BlockRequestState<Parent>>(
id,
peer_id,
&mut self.network,
error,
);
}
RequestId::ParentLookupBlob { id } => {
self.block_lookups
.parent_lookup_failed::<BlobRequestState<Parent, T::EthSpec>>(
id,
peer_id,
&mut self.network,
error,
);
}
RequestId::BackFillBlocks { id } => {
if let Some(batch_id) = self
@@ -652,8 +698,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) {
if self.should_delay_lookup(slot) {
self.block_lookups
.search_block_delayed(block_root, PeerShouldHave::Neither(peer_id));
self.block_lookups.search_block_delayed(
block_root,
PeerShouldHave::Neither(peer_id),
&self.network,
);
if let Err(e) = self
.delayed_lookups
.try_send(DelayedLookupMessage::MissingComponents(block_root))
@@ -695,7 +744,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => match process_type {
BlockProcessType::SingleBlock { id } => self
.block_lookups
.single_block_component_processed(id, result, response_type, &mut self.network),
.single_block_component_processed::<BlockRequestState<Current>>(
id,
result,
&self.network,
),
BlockProcessType::SingleBlob { id } => self
.block_lookups
.single_block_component_processed::<BlobRequestState<Current, T::EthSpec>>(
id,
result,
&self.network,
),
BlockProcessType::ParentLookup { chain_hash } => self
.block_lookups
.parent_block_processed(chain_hash, result, response_type, &mut self.network),
@@ -753,6 +813,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
block_root,
parent_components,
&[PeerShouldHave::Neither(peer_id)],
&self.network,
);
if let Err(e) = self
.delayed_lookups
@@ -883,20 +944,30 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response(
id,
peer_id,
block,
seen_timestamp,
&mut self.network,
),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response(
id,
peer_id,
block,
seen_timestamp,
&mut self.network,
),
RequestId::SingleBlock { id } => self
.block_lookups
.single_lookup_response::<BlockRequestState<Current>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
RequestId::SingleBlob { id } => {
crit!(self.log, "Blob received during block request"; "peer_id" => %peer_id );
}
RequestId::ParentLookup { id } => self
.block_lookups
.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
RequestId::ParentLookupBlob { id } => {
crit!(self.log, "Blob received during parent block request"; "peer_id" => %peer_id );
}
RequestId::BackFillBlocks { id } => {
let is_stream_terminator = block.is_none();
if let Some(batch_id) = self
@@ -954,20 +1025,31 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { id } => self.block_lookups.single_blob_lookup_response(
id,
peer_id,
blob,
seen_timestamp,
&mut self.network,
),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_blob_response(
id,
peer_id,
blob,
seen_timestamp,
&mut self.network,
),
RequestId::SingleBlock { id } => {
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
}
RequestId::SingleBlob { id } => self
.block_lookups
.single_lookup_response::<BlobRequestState<Current, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&mut self.network,
),
RequestId::ParentLookup { id } => {
crit!(self.log, "Single blob received during parent block request"; "peer_id" => %peer_id );
}
RequestId::ParentLookupBlob { id } => self
.block_lookups
.parent_lookup_response::<BlobRequestState<Parent, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&mut self.network,
),
RequestId::BackFillBlocks { id: _ } => {
crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id );
}

View File

@@ -7,7 +7,7 @@ use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::{NetworkMessage, RequestId};
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::{BlobRequestId, BlockRequestId};
use crate::sync::block_lookups::LookupType;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use fnv::FnvHashMap;
@@ -37,7 +37,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
/// A sequential ID for all RPC requests.
request_id: Id,
request_id: std::cell::Cell<Id>,
/// BlocksByRange requests made by the range syncing algorithm.
range_requests: FnvHashMap<Id, (ChainId, BatchId)>,
@@ -62,7 +62,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
/// Logger for the `SyncNetworkContext`.
log: slog::Logger,
pub log: slog::Logger,
}
/// Small enumeration to make dealing with block and blob requests easier.
@@ -93,7 +93,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
SyncNetworkContext {
network_send,
execution_engine_state: EngineState::Online, // always assume `Online` at the start
request_id: 1,
request_id: std::cell::Cell::new(1),
range_requests: FnvHashMap::default(),
backfill_requests: FnvHashMap::default(),
range_blocks_and_blobs_requests: FnvHashMap::default(),
@@ -118,11 +118,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.unwrap_or_default()
}
pub fn status_peers<C: ToStatusMessage>(
&mut self,
chain: &C,
peers: impl Iterator<Item = PeerId>,
) {
pub fn status_peers<C: ToStatusMessage>(&self, chain: &C, peers: impl Iterator<Item = PeerId>) {
let status_message = chain.status_message();
for peer_id in peers {
debug!(
@@ -409,20 +405,39 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
/// Sends a blocks by root request for a parent request.
pub fn single_block_lookup_request(
&mut self,
pub fn single_lookup_request(
&self,
id: Id,
peer_id: PeerId,
request: BlocksByRootRequest,
) -> Result<Id, &'static str> {
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id });
blob_peer_id: PeerId,
blob_request: BlobsByRootRequest,
lookup_type: LookupType,
) -> Result<(), &'static str> {
self.single_block_lookup_request_retry(id, peer_id, request, lookup_type)?;
self.single_blob_lookup_request_retry(id, blob_peer_id, blob_request, lookup_type)?;
Ok(())
}
pub fn single_block_lookup_request_retry(
&self,
id: Id,
peer_id: PeerId,
request: BlocksByRootRequest,
lookup_type: LookupType,
) -> Result<(), &'static str> {
let sync_id = match lookup_type {
LookupType::Current => SyncRequestId::SingleBlock { id },
LookupType::Parent => SyncRequestId::ParentLookup { id },
};
let request_id = RequestId::Sync(sync_id);
trace!(
self.log,
"Sending BlocksByRoot Request";
"method" => "BlocksByRoot",
"count" => request.block_roots().len(),
"peer" => %peer_id
"peer" => %peer_id,
"lookup_type" => ?lookup_type
);
self.send_network_msg(NetworkMessage::SendRequest {
@@ -430,82 +445,34 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: Request::BlocksByRoot(request),
request_id,
})?;
Ok(id)
Ok(())
}
/// Sends a blobs by root request for a parent request.
pub fn single_blobs_lookup_request(
&mut self,
peer_id: PeerId,
request: BlobsByRootRequest,
) -> Result<Id, &'static str> {
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id });
pub fn single_blob_lookup_request_retry(
&self,
id: Id,
blob_peer_id: PeerId,
blob_request: BlobsByRootRequest,
lookup_type: LookupType,
) -> Result<(), &'static str> {
let request_id = RequestId::Sync(SyncRequestId::SingleBlob { id });
trace!(
self.log,
"Sending BlobsByRoot Request";
"method" => "BlobsByRoot",
"count" => request.blob_ids.len(),
"peer" => %peer_id
);
if !blob_request.blob_ids.is_empty() {
trace!(
self.log,
"Sending BlobsByRoot Request";
"method" => "BlobsByRoot",
"count" => blob_request.blob_ids.len(),
"peer" => %blob_peer_id
);
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlobsByRoot(request),
request_id,
})?;
Ok(id)
}
/// Sends a blocks by root request for a parent request.
pub fn parent_lookup_block_request(
&mut self,
peer_id: PeerId,
request: BlocksByRootRequest,
) -> Result<BlockRequestId, &'static str> {
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id });
trace!(
self.log,
"Sending parent BlocksByRoot Request";
"method" => "BlocksByRoot",
"count" => request.block_roots().len(),
"peer" => %peer_id
);
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlocksByRoot(request),
request_id,
})?;
Ok(id)
}
/// Sends a blocks by root request for a parent request.
pub fn parent_lookup_blobs_request(
&mut self,
peer_id: PeerId,
request: BlobsByRootRequest,
) -> Result<BlobRequestId, &'static str> {
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id });
trace!(
self.log,
"Sending parent BlobsByRoot Request";
"method" => "BlobsByRoot",
"count" => request.blob_ids.len(),
"peer" => %peer_id
);
self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::BlobsByRoot(request),
request_id,
})?;
Ok(id)
self.send_network_msg(NetworkMessage::SendRequest {
peer_id: blob_peer_id,
request: Request::BlobsByRoot(blob_request),
request_id,
})?;
}
Ok(())
}
pub fn is_execution_engine_online(&self) -> bool {
@@ -532,7 +499,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
/// Reports to the scoring algorithm the behaviour of a peer.
pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction, msg: &'static str) {
pub fn report_peer(&self, peer_id: PeerId, action: PeerAction, msg: &'static str) {
debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action);
self.network_send
.send(NetworkMessage::ReportPeer {
@@ -547,7 +514,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
/// Subscribes to core topics.
pub fn subscribe_core_topics(&mut self) {
pub fn subscribe_core_topics(&self) {
self.network_send
.send(NetworkMessage::SubscribeCoreTopics)
.unwrap_or_else(|e| {
@@ -556,7 +523,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
/// Sends an arbitrary network message.
fn send_network_msg(&mut self, msg: NetworkMessage<T::EthSpec>) -> Result<(), &'static str> {
fn send_network_msg(&self, msg: NetworkMessage<T::EthSpec>) -> Result<(), &'static str> {
self.network_send.send(msg).map_err(|_| {
debug!(self.log, "Could not send message to the network service");
"Network channel send Failed"
@@ -572,10 +539,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&self.network_beacon_processor
}
fn next_id(&mut self) -> Id {
let id = self.request_id;
self.request_id += 1;
id
pub fn next_id(&self) -> Id {
let current_value = self.request_id.get();
self.request_id.set(current_value + 1);
current_value
}
/// Check whether a batch for this epoch (and only this epoch) should request just blocks or
@@ -587,25 +554,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
const _: () = assert!(
super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1
&& super::range_sync::EPOCHS_PER_BATCH == 1,
"To deal with alignment with 4844 boundaries, batches need to be of just one epoch"
"To deal with alignment with deneb boundaries, batches need to be of just one epoch"
);
#[cfg(test)]
{
// Keep tests only for blocks.
ByRangeRequestType::Blocks
}
#[cfg(not(test))]
{
if let Some(data_availability_boundary) = self.chain.data_availability_boundary() {
if epoch >= data_availability_boundary {
ByRangeRequestType::BlocksAndBlobs
} else {
ByRangeRequestType::Blocks
}
if let Some(data_availability_boundary) = self.chain.data_availability_boundary() {
if epoch >= data_availability_boundary {
ByRangeRequestType::BlocksAndBlobs
} else {
ByRangeRequestType::Blocks
}
} else {
ByRangeRequestType::Blocks
}
}
}