fix compilation in main block lookup mod

This commit is contained in:
realbigsean
2023-04-19 14:02:41 -04:00
parent 195d802931
commit 0ad9fdfbbf
9 changed files with 784 additions and 411 deletions

View File

@@ -1,5 +1,6 @@
use derivative::Derivative; use derivative::Derivative;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz_types::FixedVector;
use std::sync::Arc; use std::sync::Arc;
use crate::beacon_chain::{ use crate::beacon_chain::{
@@ -14,8 +15,8 @@ use crate::BeaconChainError;
use kzg::Kzg; use kzg::Kzg;
use types::blob_sidecar::BlobIdentifier; use types::blob_sidecar::BlobIdentifier;
use types::{ use types::{
BeaconBlockRef, BeaconStateError, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256, BeaconBlockRef, BeaconStateError, BlobSidecar, Epoch, EthSpec, Hash256, KzgCommitment,
KzgCommitment, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot,
}; };
#[derive(Debug)] #[derive(Debug)]
@@ -475,11 +476,19 @@ impl<E: EthSpec> AsBlock<E> for &MaybeAvailableBlock<E> {
#[derivative(Hash(bound = "E: EthSpec"))] #[derivative(Hash(bound = "E: EthSpec"))]
pub enum BlockWrapper<E: EthSpec> { pub enum BlockWrapper<E: EthSpec> {
Block(Arc<SignedBeaconBlock<E>>), Block(Arc<SignedBeaconBlock<E>>),
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, Vec<Arc<BlobSidecar<E>>>), BlockAndBlobs(
Arc<SignedBeaconBlock<E>>,
FixedVector<Option<Arc<BlobSidecar<E>>>, E::MaxBlobsPerBlock>,
),
} }
impl<E: EthSpec> BlockWrapper<E> { impl<E: EthSpec> BlockWrapper<E> {
pub fn deconstruct(self) -> (Arc<SignedBeaconBlock<E>>, Option<Vec<Arc<BlobSidecar<E>>>>) { pub fn deconstruct(
self,
) -> (
Arc<SignedBeaconBlock<E>>,
Option<FixedVector<Option<Arc<BlobSidecar<E>>>, E::MaxBlobsPerBlock>>,
) {
match self { match self {
BlockWrapper::Block(block) => (block, None), BlockWrapper::Block(block) => (block, None),
BlockWrapper::BlockAndBlobs(block, blobs) => (block, Some(blobs)), BlockWrapper::BlockAndBlobs(block, blobs) => (block, Some(blobs)),

View File

@@ -46,6 +46,10 @@ pub enum AvailabilityCheckError {
block_root: Hash256, block_root: Hash256,
blob_block_root: Hash256, blob_block_root: Hash256,
}, },
UnorderedBlobs {
expected_index: u64,
blob_index: u64,
},
} }
impl From<ssz_types::Error> for AvailabilityCheckError { impl From<ssz_types::Error> for AvailabilityCheckError {
@@ -136,11 +140,11 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
} }
} }
pub fn zip_block( pub fn wrap_block(
&self, &self,
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<T>>, block: Arc<SignedBeaconBlock<T>>,
blobs: Vec<Arc<BlobSidecar<T>>>, blobs: FixedVector<Option<Arc<BlobSidecar<T>>>, T::MaxBlobsPerBlock>,
) -> Result<BlockWrapper<T>, AvailabilityCheckError> { ) -> Result<BlockWrapper<T>, AvailabilityCheckError> {
Ok(match self.get_blob_requirements(&block)? { Ok(match self.get_blob_requirements(&block)? {
BlobRequirements::EmptyBlobs => BlockWrapper::Block(block), BlobRequirements::EmptyBlobs => BlockWrapper::Block(block),
@@ -153,32 +157,29 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
.blob_kzg_commitments() .blob_kzg_commitments()
.map(|commitments| commitments.len()) .map(|commitments| commitments.len())
.unwrap_or(0); .unwrap_or(0);
let mut expected_indices: HashSet<usize> =
(0..expected_num_blobs).into_iter().collect(); let mut blob_count = 0;
if blobs.len() < expected_num_blobs { while let Some((index, Some(blob))) = blobs.iter().enumerate().next() {
return Err(AvailabilityCheckError::NumBlobsMismatch { blob_count += 1;
num_kzg_commitments: expected_num_blobs,
num_blobs: blobs.len(),
});
}
for blob in blobs.iter() {
if blob.block_root != block_root { if blob.block_root != block_root {
return Err(AvailabilityCheckError::BlockBlobRootMismatch { return Err(AvailabilityCheckError::BlockBlobRootMismatch {
block_root, block_root,
blob_block_root: blob.block_root, blob_block_root: blob.block_root,
}); });
} }
let removed = expected_indices.remove(&(blob.index as usize));
if !removed { let expected_index = index as u64;
if expected_index != blob.index {
return Err(AvailabilityCheckError::UnorderedBlobs {
expected_index,
blob_index: blob.index,
});
}
}
if blob_count < expected_num_blobs {
return Err(AvailabilityCheckError::MissingBlobs); return Err(AvailabilityCheckError::MissingBlobs);
} }
}
if !expected_indices.is_empty() {
return Err(AvailabilityCheckError::DuplicateBlob(block_root));
}
//TODO(sean) do we re-order blobs here to the correct order?
BlockWrapper::BlockAndBlobs(block, blobs) BlockWrapper::BlockAndBlobs(block, blobs)
} }
@@ -199,8 +200,10 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
pub fn put_rpc_blobs( pub fn put_rpc_blobs(
&self, &self,
block_root: Hash256, block_root: Hash256,
blobs: Vec<Arc<BlobSidecar<T>>>, blobs: FixedVector<Option<Arc<BlobSidecar<T>>>, T::MaxBlobsPerBlock>,
) -> Result<Availability<T>, AvailabilityCheckError> { ) -> Result<Availability<T>, AvailabilityCheckError> {
//TODO(sean) merge with existing blobs, only kzg verify blobs we haven't yet verified
// Verify the KZG commitment. // Verify the KZG commitment.
let kzg_verified_blobs = if let Some(kzg) = self.kzg.as_ref() { let kzg_verified_blobs = if let Some(kzg) = self.kzg.as_ref() {
verify_kzg_for_blob_list(blobs, kzg)? verify_kzg_for_blob_list(blobs, kzg)?

View File

@@ -54,6 +54,7 @@ use lighthouse_network::{
}; };
use logging::TimeLatch; use logging::TimeLatch;
use slog::{crit, debug, error, trace, warn, Logger}; use slog::{crit, debug, error, trace, warn, Logger};
use ssz_types::FixedVector;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
@@ -631,7 +632,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
pub fn rpc_blobs( pub fn rpc_blobs(
block_root: Hash256, block_root: Hash256,
blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>, blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
) -> Self { ) -> Self {
@@ -947,7 +948,7 @@ pub enum Work<T: BeaconChainTypes> {
}, },
RpcBlobs { RpcBlobs {
block_root: Hash256, block_root: Hash256,
blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>, blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
}, },

View File

@@ -16,6 +16,7 @@ use beacon_chain::{
}; };
use lighthouse_network::PeerAction; use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn}; use slog::{debug, error, info, warn};
use ssz_types::FixedVector;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::{BlobSidecar, Epoch, Hash256, SignedBeaconBlock}; use types::{BlobSidecar, Epoch, Hash256, SignedBeaconBlock};
@@ -54,9 +55,10 @@ impl<T: BeaconChainTypes> Worker<T> {
) { ) {
if !should_process { if !should_process {
// Sync handles these results // Sync handles these results
self.send_sync_message(SyncMessage::BlockOrBlobProcessed { self.send_sync_message(SyncMessage::BlockPartProcessed {
process_type, process_type,
result: crate::sync::manager::BlockOrBlobProcessResult::Ignored, result: crate::sync::manager::BlockPartProcessingResult::Ignored,
response_type: crate::sync::manager::ResponseType::Block,
}); });
return; return;
} }
@@ -129,9 +131,10 @@ impl<T: BeaconChainTypes> Worker<T> {
} }
} }
// Sync handles these results // Sync handles these results
self.send_sync_message(SyncMessage::BlockOrBlobProcessed { self.send_sync_message(SyncMessage::BlockPartProcessed {
process_type, process_type,
result: result.into(), result: result.into(),
response_type: ResponseType::Block,
}); });
// Drop the handle to remove the entry from the cache // Drop the handle to remove the entry from the cache
@@ -141,7 +144,7 @@ impl<T: BeaconChainTypes> Worker<T> {
pub async fn process_rpc_blobs( pub async fn process_rpc_blobs(
self, self,
block_root: Hash256, block_root: Hash256,
blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>, blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
seen_timestamp: Duration, seen_timestamp: Duration,
process_type: BlockProcessType, process_type: BlockProcessType,
) { ) {
@@ -158,9 +161,10 @@ impl<T: BeaconChainTypes> Worker<T> {
.await; .await;
// Sync handles these results // Sync handles these results
self.send_sync_message(SyncMessage::BlockOrBlobProcessed { self.send_sync_message(SyncMessage::BlockPartProcessed {
process_type, process_type,
result: result.into(), result: result.into(),
response_type: ResponseType::Blobs,
}); });
} }

File diff suppressed because it is too large Load Diff

View File

@@ -1,17 +1,17 @@
use super::DownlodedBlocks; use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup};
use crate::sync::block_lookups::single_block_lookup::{RequestableThing, SingleBlobsRequest}; use super::DownloadedBlocks;
use crate::sync::block_lookups::RootBlockTuple; use crate::sync::block_lookups::{single_block_lookup, RootBlockTuple};
use crate::sync::manager::BlockProcessType;
use crate::sync::{ use crate::sync::{
manager::{Id, SLOT_IMPORT_TOLERANCE}, manager::{Id, SLOT_IMPORT_TOLERANCE},
network_context::SyncNetworkContext, network_context::SyncNetworkContext,
}; };
use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock};
use beacon_chain::data_availability_checker::{AvailableBlock, DataAvailabilityChecker}; use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
use lighthouse_network::libp2p::core::either::EitherName::A;
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
use ssz_types::FixedVector;
use std::iter; use std::iter;
use std::sync::Arc; use std::sync::Arc;
use store::Hash256; use store::Hash256;
@@ -19,8 +19,6 @@ use strum::IntoStaticStr;
use types::blob_sidecar::BlobIdentifier; use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
use super::single_block_lookup::{self, SingleBlockLookup};
/// How many attempts we try to find a parent of a block before we give up trying. /// How many attempts we try to find a parent of a block before we give up trying.
pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;
/// The maximum depth we will search for a parent block. In principle we should have sync'd any /// The maximum depth we will search for a parent block. In principle we should have sync'd any
@@ -33,7 +31,7 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
/// The root of the block triggering this parent request. /// The root of the block triggering this parent request.
chain_hash: Hash256, chain_hash: Hash256,
/// The blocks that have currently been downloaded. /// The blocks that have currently been downloaded.
downloaded_blocks: Vec<DownlodedBlocks<T::EthSpec>>, downloaded_blocks: Vec<DownloadedBlocks<T::EthSpec>>,
/// Request of the last parent. /// Request of the last parent.
pub current_parent_request: SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>, pub current_parent_request: SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>,
/// Id of the last parent request. /// Id of the last parent request.
@@ -42,11 +40,15 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
} }
#[derive(Debug, PartialEq, Eq, IntoStaticStr)] #[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum VerifyError { pub enum ParentVerifyError {
RootMismatch, RootMismatch,
NoBlockReturned, NoBlockReturned,
ExtraBlocksReturned, ExtraBlocksReturned,
UnrequestedBlobId,
ExtraBlobsReturned,
InvalidIndex(u64),
PreviousFailure { parent_root: Hash256 }, PreviousFailure { parent_root: Hash256 },
AvailabilityCheck(AvailabilityCheckError),
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@@ -62,7 +64,7 @@ pub enum RequestError {
NoPeers, NoPeers,
} }
pub enum RequestResult<T: EthSpec> { pub enum LookupDownloadStatus<T: EthSpec> {
Process(BlockWrapper<T>), Process(BlockWrapper<T>),
SearchBlock(Hash256), SearchBlock(Hash256),
} }
@@ -76,11 +78,11 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
pub fn new( pub fn new(
block_root: Hash256, block_root: Hash256,
parent_root: Hash256,
peer_id: PeerId, peer_id: PeerId,
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>, da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>,
) -> Self { ) -> Self {
let current_parent_request = let current_parent_request = SingleBlockLookup::new(parent_root, peer_id, da_checker);
SingleBlockLookup::new(block.parent_root(), peer_id, da_checker);
Self { Self {
chain_hash: block_root, chain_hash: block_root,
@@ -141,31 +143,61 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
} }
pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
self.current_parent_request.check_peer_disconnected(peer_id) self.current_parent_request
.block_request_state
.check_peer_disconnected(peer_id)
} }
pub fn check_blob_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { pub fn check_blob_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> {
self.current_parent_blob_request self.current_parent_request
.map(|mut req| req.check_peer_disconnected(peer_id)) .blob_request_state
.unwrap_or_default() .check_peer_disconnected(peer_id)
}
pub fn add_block_wrapper(&mut self, block: BlockWrapper<T::EthSpec>) {
let next_parent = block.parent_root();
let current_root = self.current_parent_request.requested_block_root;
self.downloaded_blocks.push((current_root, block));
self.current_parent_request.requested_block_root = next_parent;
let mut blob_ids = Vec::with_capacity(T::EthSpec::max_blobs_per_block());
for i in 0..T::EthSpec::max_blobs_per_block() {
blob_ids.push(BlobIdentifier {
block_root: current_root,
index: i as u64,
});
}
self.current_parent_request.requested_ids = blob_ids;
self.current_parent_request.block_request_state.state =
single_block_lookup::State::AwaitingDownload;
self.current_parent_request.blob_request_state.state =
single_block_lookup::State::AwaitingDownload;
self.current_parent_request_id = None;
self.current_parent_blob_request_id = None;
} }
pub fn add_block( pub fn add_block(
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> RequestResult<T::EthSpec> { ) -> Result<LookupDownloadStatus<T::EthSpec>, ParentVerifyError> {
self.current_parent_request_id = None; self.current_parent_request_id = None;
self.current_parent_request.add_block(block_root, block) self.current_parent_request
.add_block(block_root, block)
.map_err(Into::into)
} }
pub fn add_blobs( pub fn add_blobs(
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>, blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
) -> RequestResult<T::EthSpec> { ) -> Result<LookupDownloadStatus<T::EthSpec>, ParentVerifyError> {
self.current_parent_blob_request_id = None; self.current_parent_blob_request_id = None;
self.current_parent_request.add_blobs(block_root, blobs) self.current_parent_request
.add_blobs(block_root, blobs)
.map_err(Into::into)
} }
pub fn pending_block_response(&self, req_id: Id) -> bool { pub fn pending_block_response(&self, req_id: Id) -> bool {
@@ -182,10 +214,9 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
self, self,
) -> ( ) -> (
Hash256, Hash256,
Vec<MaybeAvailableBlock<T::EthSpec>>, Vec<BlockWrapper<T::EthSpec>>,
Vec<Hash256>, Vec<Hash256>,
SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>, SingleBlockLookup<PARENT_FAIL_TOLERANCE, T>,
Option<SingleBlobsRequest<PARENT_FAIL_TOLERANCE, T::EthSpec>>,
) { ) {
let ParentLookup { let ParentLookup {
chain_hash, chain_hash,
@@ -201,13 +232,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
blocks.push(block); blocks.push(block);
hashes.push(hash); hashes.push(hash);
} }
( (chain_hash, blocks, hashes, current_parent_request)
chain_hash,
blocks,
hashes,
current_parent_request,
current_parent_blob_request,
)
} }
/// Get the parent lookup's chain hash. /// Get the parent lookup's chain hash.
@@ -216,24 +241,30 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
} }
pub fn block_download_failed(&mut self) { pub fn block_download_failed(&mut self) {
self.current_parent_request.register_failure_downloading(); self.current_parent_request
self.current_parent_request_id = None; .block_request_state
} .register_failure_downloading();
pub fn block_processing_failed(&mut self) {
self.current_parent_request.register_failure_processing();
self.current_parent_request_id = None; self.current_parent_request_id = None;
} }
pub fn blob_download_failed(&mut self) { pub fn blob_download_failed(&mut self) {
self.current_parent_blob_request self.current_parent_request
.map(|mut req| req.register_failure_downloading()); .blob_request_state
.register_failure_downloading();
self.current_parent_blob_request_id = None; self.current_parent_blob_request_id = None;
} }
pub fn block_processing_failed(&mut self) {
self.current_parent_request
.block_request_state
.register_failure_processing();
self.current_parent_request_id = None;
}
pub fn blob_processing_failed(&mut self) { pub fn blob_processing_failed(&mut self) {
self.current_parent_blob_request self.current_parent_request
.map(|mut req| req.register_failure_processing()); .blob_request_state
.register_failure_processing();
self.current_parent_blob_request_id = None; self.current_parent_blob_request_id = None;
} }
@@ -243,7 +274,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
&mut self, &mut self,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>, block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>, failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<RootBlockTuple<T::EthSpec>>, VerifyError> { ) -> Result<Option<RootBlockTuple<T::EthSpec>>, ParentVerifyError> {
let root_and_block = self.current_parent_request.verify_block(block)?; let root_and_block = self.current_parent_request.verify_block(block)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should // check if the parent of this block isn't in the failed cache. If it is, this chain should
@@ -257,7 +288,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.block_request_state .block_request_state
.register_failure_downloading(); .register_failure_downloading();
self.current_parent_request_id = None; self.current_parent_request_id = None;
return Err(VerifyError::PreviousFailure { parent_root }); return Err(ParentVerifyError::PreviousFailure { parent_root });
} }
} }
@@ -268,7 +299,14 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
&mut self, &mut self,
blob: Option<Arc<BlobSidecar<T::EthSpec>>>, blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>, failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<Vec<Arc<BlobSidecar<T::EthSpec>>>>, VerifyError> { ) -> Result<
Option<(
Hash256,
FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
)>,
ParentVerifyError,
> {
let block_root = self.current_parent_request.requested_block_root;
let blobs = self.current_parent_request.verify_blob(blob)?; let blobs = self.current_parent_request.verify_blob(blob)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should // check if the parent of this block isn't in the failed cache. If it is, this chain should
@@ -276,23 +314,38 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
if let Some(parent_root) = blobs if let Some(parent_root) = blobs
.as_ref() .as_ref()
.and_then(|blobs| blobs.first()) .and_then(|blobs| blobs.first())
.map(|blob| blob.block_parent_root) .map(|blob| blob.block_parent_root_id)
{ {
if failed_chains.contains(&parent_root) { if failed_chains.contains(&parent_root) {
self.current_parent_request self.current_parent_request
.blob_request_state .blob_request_state
.register_failure_downloading(); .register_failure_downloading();
self.current_parent_blob_request_id = None; self.current_parent_blob_request_id = None;
return Err(VerifyError::PreviousFailure { parent_root }); return Err(ParentVerifyError::PreviousFailure { parent_root });
} }
} }
Ok(blobs) Ok((block_root, blobs))
} }
pub fn get_block_processing_peer(&self, chain_hash: Hash256) -> Option<PeerId> { pub fn get_block_processing_peer(&self, chain_hash: Hash256) -> Option<PeerId> {
if self.chain_hash == chain_hash { if self.chain_hash == chain_hash {
return self.current_parent_request.processing_peer().ok(); return self
.current_parent_request
.block_request_state
.processing_peer()
.ok();
}
None
}
pub fn get_blob_processing_peer(&self, chain_hash: Hash256) -> Option<PeerId> {
if self.chain_hash == chain_hash {
return self
.current_parent_request
.blob_request_state
.processing_peer()
.ok();
} }
None None
} }
@@ -310,15 +363,6 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
self.current_parent_request.used_peers.iter() self.current_parent_request.used_peers.iter()
} }
pub fn get_blob_processing_peer(&self, chain_hash: Hash256) -> Option<PeerId> {
if self.chain_hash == chain_hash {
return self
.current_parent_blob_request
.and_then(|req| req.processing_peer().ok());
}
None
}
#[cfg(test)] #[cfg(test)]
pub fn failed_blob_attempts(&self) -> u8 { pub fn failed_blob_attempts(&self) -> u8 {
self.current_parent_blob_request self.current_parent_blob_request
@@ -336,20 +380,24 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
} }
} }
impl From<super::single_block_lookup::VerifyError> for VerifyError { impl From<LookupVerifyError> for ParentVerifyError {
fn from(e: super::single_block_lookup::VerifyError) -> Self { fn from(e: LookupVerifyError) -> Self {
use super::single_block_lookup::VerifyError as E; use LookupVerifyError as E;
match e { match e {
E::RootMismatch => VerifyError::RootMismatch, E::RootMismatch => ParentVerifyError::RootMismatch,
E::NoBlockReturned => VerifyError::NoBlockReturned, E::NoBlockReturned => ParentVerifyError::NoBlockReturned,
E::ExtraBlocksReturned => VerifyError::ExtraBlocksReturned, E::ExtraBlocksReturned => ParentVerifyError::ExtraBlocksReturned,
E::UnrequestedBlobId => ParentVerifyError::UnrequestedBlobId,
E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned,
E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index),
E::AvailabilityCheck(e) => ParentVerifyError::AvailabilityCheck(e),
} }
} }
} }
impl From<super::single_block_lookup::LookupRequestError> for RequestError { impl From<LookupRequestError> for RequestError {
fn from(e: super::single_block_lookup::LookupRequestError) -> Self { fn from(e: LookupRequestError) -> Self {
use super::single_block_lookup::LookupRequestError as E; use LookupRequestError as E;
match e { match e {
E::TooManyAttempts { cannot_process } => { E::TooManyAttempts { cannot_process } => {
RequestError::TooManyAttempts { cannot_process } RequestError::TooManyAttempts { cannot_process }

View File

@@ -1,18 +1,13 @@
use super::DownlodedBlocks; use crate::sync::block_lookups::parent_lookup::LookupDownloadStatus;
use crate::sync::block_lookups::parent_lookup::RequestResult;
use crate::sync::block_lookups::RootBlockTuple; use crate::sync::block_lookups::RootBlockTuple;
use crate::sync::manager::BlockProcessType; use beacon_chain::blob_verification::AsBlock;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, AvailabilityPendingBlock, DataAvailabilityChecker,
};
use beacon_chain::{get_block_root, BeaconChainTypes}; use beacon_chain::{get_block_root, BeaconChainTypes};
use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::rpc::methods::BlobsByRootRequest;
use lighthouse_network::{rpc::BlocksByRootRequest, PeerId, Request}; use lighthouse_network::{rpc::BlocksByRootRequest, PeerId};
use rand::seq::IteratorRandom; use rand::seq::IteratorRandom;
use ssz_types::VariableList; use ssz_types::{FixedVector, VariableList};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use store::{EthSpec, Hash256}; use store::{EthSpec, Hash256};
@@ -23,7 +18,8 @@ use types::{BlobSidecar, SignedBeaconBlock};
pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> { pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> {
pub requested_block_root: Hash256, pub requested_block_root: Hash256,
pub requested_ids: Vec<BlobIdentifier>, pub requested_ids: Vec<BlobIdentifier>,
pub downloaded_blobs: Vec<Option<Arc<BlobSidecar<T::EthSpec>>>>, pub downloaded_blobs:
FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
pub downloaded_block: Option<Arc<BlobSidecar<T::EthSpec>>>, pub downloaded_block: Option<Arc<BlobSidecar<T::EthSpec>>>,
pub block_request_state: SingleLookupRequestState<MAX_ATTEMPTS>, pub block_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
pub blob_request_state: SingleLookupRequestState<MAX_ATTEMPTS>, pub blob_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
@@ -55,16 +51,14 @@ pub enum State {
} }
#[derive(Debug, PartialEq, Eq, IntoStaticStr)] #[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum VerifyError { pub enum LookupVerifyError {
RootMismatch, RootMismatch,
NoBlockReturned, NoBlockReturned,
ExtraBlocksReturned, ExtraBlocksReturned,
}
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum BlobVerifyError {
UnrequestedBlobId, UnrequestedBlobId,
ExtraBlobsReturned, ExtraBlobsReturned,
InvalidIndex(u64),
AvailabilityCheck(AvailabilityCheckError),
} }
#[derive(Debug, PartialEq, Eq, IntoStaticStr)] #[derive(Debug, PartialEq, Eq, IntoStaticStr)]
@@ -85,9 +79,9 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
) -> Self { ) -> Self {
Self { Self {
requested_block_root, requested_block_root,
requested_ids: vec![], requested_ids: <_>::default(),
downloaded_block: None, downloaded_block: None,
downloaded_blobs: vec![], downloaded_blobs: <_>::default(),
block_request_state: SingleLookupRequestState::new(peer_id), block_request_state: SingleLookupRequestState::new(peer_id),
blob_request_state: SingleLookupRequestState::new(peer_id), blob_request_state: SingleLookupRequestState::new(peer_id),
da_checker, da_checker,
@@ -97,19 +91,25 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
pub fn add_blobs( pub fn add_blobs(
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>, blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
) -> RequestResult<T::EthSpec> { ) -> Result<LookupDownloadStatus<T::EthSpec>, LookupVerifyError> {
//TODO(sean) smart extend, we don't want dupes for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() {
self.downloaded_blobs.extend(blobs); if let Some(Some(downloaded_blob)) = blobs.get(index) {
//TODO(sean) should we log a warn if there is already a downloaded blob?
blob_opt = Some(downloaded_blob);
}
}
if let Some(block) = self.downloaded_block.as_ref() { if let Some(block) = self.downloaded_block.as_ref() {
match self.da_checker.zip_block(block_root, block, blobs) { match self.da_checker.wrap_block(block_root, block, blobs) {
Ok(wrapper) => RequestResult::Process(wrapper), Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)),
Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root), Err(AvailabilityCheckError::MissingBlobs) => {
_ => todo!(), Ok(LookupDownloadStatus::SearchBlock(block_root))
}
Err(e) => Err(LookupVerifyError::AvailabilityCheck(e)),
} }
} else { } else {
RequestResult::SearchBlock(block_hash) Ok(LookupDownloadStatus::SearchBlock(block_hash))
} }
} }
@@ -117,17 +117,23 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> RequestResult<T::EthSpec> { ) -> Result<LookupDownloadStatus<T::EthSpec>, LookupVerifyError> {
//TODO(sean) check for existing block? for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() {
self.downloaded_block = Some(block); if let Some(Some(downloaded_blob)) = blobs.get(index) {
//TODO(sean) should we log a warn if there is already a downloaded blob?
*blob_opt = Some(downloaded_blob);
}
}
match self match self
.da_checker .da_checker
.zip_block(block_root, block, self.downloaded_blobs) .wrap_block(block_root, block, self.downloaded_blobs)
{ {
Ok(wrapper) => RequestResult::Process(wrapper), Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)),
Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root), Err(AvailabilityCheckError::MissingBlobs) => {
_ => todo!(), Ok(LookupDownloadStatus::SearchBlock(block_root))
}
Err(e) => LookupVerifyError::AvailabilityCheck(e),
} }
} }
@@ -135,7 +141,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
block: BlockWrapper<T::EthSpec>, block: BlockWrapper<T::EthSpec>,
) -> RequestResult<T::EthSpec> { ) -> Result<LookupDownloadStatus<T::EthSpec>, LookupVerifyError> {
match block { match block {
BlockWrapper::Block(block) => self.add_block(block_root, block), BlockWrapper::Block(block) => self.add_block(block_root, block),
BlockWrapper::BlockAndBlobs(block, blobs) => { BlockWrapper::BlockAndBlobs(block, blobs) => {
@@ -144,12 +150,6 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
self.add_blobs(block_root, blobs) self.add_blobs(block_root, blobs)
} }
} }
match self.da_checker.zip_block(block_root, block, blobs) {
Ok(wrapper) => RequestResult::Process(wrapper),
Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root),
_ => todo!(),
}
} }
/// Verifies if the received block matches the requested one. /// Verifies if the received block matches the requested one.
@@ -157,11 +157,11 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
pub fn verify_block( pub fn verify_block(
&mut self, &mut self,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>, block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Result<Option<RootBlockTuple<T::EthSpec>>, VerifyError> { ) -> Result<Option<RootBlockTuple<T::EthSpec>>, LookupVerifyError> {
match self.block_request_state.state { match self.block_request_state.state {
State::AwaitingDownload => { State::AwaitingDownload => {
self.block_request_state.register_failure_downloading(); self.block_request_state.register_failure_downloading();
Err(VerifyError::ExtraBlocksReturned) Err(LookupVerifyError::ExtraBlocksReturned)
} }
State::Downloading { peer_id } => match block { State::Downloading { peer_id } => match block {
Some(block) => { Some(block) => {
@@ -173,7 +173,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
// NOTE: we take this is as a download failure to prevent counting the // NOTE: we take this is as a download failure to prevent counting the
// attempt as a chain failure, but simply a peer failure. // attempt as a chain failure, but simply a peer failure.
self.block_request_state.register_failure_downloading(); self.block_request_state.register_failure_downloading();
Err(VerifyError::RootMismatch) Err(LookupVerifyError::RootMismatch)
} else { } else {
// Return the block for processing. // Return the block for processing.
self.block_request_state.state = State::Processing { peer_id }; self.block_request_state.state = State::Processing { peer_id };
@@ -182,14 +182,14 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
} }
None => { None => {
self.register_failure_downloading(); self.register_failure_downloading();
Err(VerifyError::NoBlockReturned) Err(LookupVerifyError::NoBlockReturned)
} }
}, },
State::Processing { peer_id: _ } => match block { State::Processing { peer_id: _ } => match block {
Some(_) => { Some(_) => {
// We sent the block for processing and received an extra block. // We sent the block for processing and received an extra block.
self.block_request_state.register_failure_downloading(); self.block_request_state.register_failure_downloading();
Err(VerifyError::ExtraBlocksReturned) Err(LookupVerifyError::ExtraBlocksReturned)
} }
None => { None => {
// This is simply the stream termination and we are already processing the // This is simply the stream termination and we are already processing the
@@ -203,7 +203,10 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
pub fn verify_blob( pub fn verify_blob(
&mut self, &mut self,
blob: Option<Arc<BlobSidecar<T::EthSpec>>>, blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
) -> Result<Option<Vec<Arc<BlobSidecar<T::EthSpec>>>>, BlobVerifyError> { ) -> Result<
Option<FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>>,
BlobVerifyError,
> {
match self.block_request_state.state { match self.block_request_state.state {
State::AwaitingDownload => { State::AwaitingDownload => {
self.blob_request_state.register_failure_downloading(); self.blob_request_state.register_failure_downloading();
@@ -216,9 +219,13 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
self.blob_request_state.register_failure_downloading(); self.blob_request_state.register_failure_downloading();
Err(BlobVerifyError::UnrequestedBlobId) Err(BlobVerifyError::UnrequestedBlobId)
} else { } else {
// state should still be downloading // State should remain downloading until we receive the stream terminator.
self.requested_ids.retain(|id| id != received_id); self.requested_ids.retain(|id| id != received_id);
self.downloaded_blobs.push(blob) if let Some(blob_opt) = self.downloaded_blobs.get_mut(blob.index) {
*blob_opt = Some(blob);
} else {
return Err(BlobVerifyError::InvalidIndex(blob.index));
}
} }
} }
None => { None => {

View File

@@ -607,7 +607,7 @@ fn test_single_block_lookup_ignored_response() {
// after processing. // after processing.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
// Send an Ignored response, the request should be dropped // Send an Ignored response, the request should be dropped
bl.single_block_processed(id, BlockOrBlobProcessResult::Ignored, &mut cx); bl.single_block_processed(id, BlockPartProcessingResult::Ignored, &mut cx);
rig.expect_empty_network(); rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 0); assert_eq!(bl.single_block_lookups.len(), 0);
} }
@@ -631,7 +631,7 @@ fn test_parent_lookup_ignored_response() {
rig.expect_empty_network(); rig.expect_empty_network();
// Return an Ignored result. The request should be dropped // Return an Ignored result. The request should be dropped
bl.parent_block_processed(chain_hash, BlockOrBlobProcessResult::Ignored, &mut cx); bl.parent_block_processed(chain_hash, BlockPartProcessingResult::Ignored, &mut cx);
rig.expect_empty_network(); rig.expect_empty_network();
assert_eq!(bl.parent_lookups.len(), 0); assert_eq!(bl.parent_lookups.len(), 0);
} }

View File

@@ -41,6 +41,7 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::service::NetworkMessage; use crate::service::NetworkMessage;
use crate::status::ToStatusMessage; use crate::status::ToStatusMessage;
use crate::sync::block_lookups::ResponseType;
use crate::sync::range_sync::ByRangeRequestType; use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock};
@@ -147,9 +148,10 @@ pub enum SyncMessage<T: EthSpec> {
}, },
/// Block processed /// Block processed
BlockOrBlobProcessed { BlockPartProcessed {
process_type: BlockProcessType, process_type: BlockProcessType,
result: BlockOrBlobProcessResult<T>, result: BlockPartProcessingResult<T>,
response_type: ResponseType,
}, },
} }
@@ -161,7 +163,7 @@ pub enum BlockProcessType {
} }
#[derive(Debug)] #[derive(Debug)]
pub enum BlockOrBlobProcessResult<T: EthSpec> { pub enum BlockPartProcessingResult<T: EthSpec> {
Ok(AvailabilityProcessingStatus), Ok(AvailabilityProcessingStatus),
Err(BlockError<T>), Err(BlockError<T>),
Ignored, Ignored,
@@ -649,9 +651,24 @@ impl<T: BeaconChainTypes> SyncManager<T> {
{ {
let parent_root = block.parent_root(); let parent_root = block.parent_root();
//TODO(sean) what about early blocks //TODO(sean) what about early blocks
let slot = match self.chain.slot_clock.now() {
Some(slot) => slot,
None => {
error!(
self.log,
"Could not read slot clock, dropping unknown block message"
);
return;
}
};
if block.slot() == self.chain.slot_clock.now() { if block.slot() == self.chain.slot_clock.now() {
self.delayed_lookups if let Err(e) = self
.send(SyncMessage::UnknownBlock(peer_id, block, block_root)); .delayed_lookups
.send(SyncMessage::UnknownBlock(peer_id, block, block_root))
{
warn!(self.log, "Delayed lookups receiver dropped for block"; "block_root" => block_hash);
}
} else { } else {
self.block_lookups.search_current_unknown_parent( self.block_lookups.search_current_unknown_parent(
block_root, block_root,
@@ -661,6 +678,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
); );
} }
self.block_lookups.search_parent( self.block_lookups.search_parent(
block.slot(),
block_root, block_root,
parent_root, parent_root,
peer_id, peer_id,
@@ -679,11 +697,27 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// If we are not synced, ignore this block. // If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) { if self.synced_and_connected(&peer_id) {
//TODO(sean) what about early gossip messages? //TODO(sean) what about early gossip messages?
if Some(slot) == self.chain.slot_clock.now() { let current_slot = match self.chain.slot_clock.now() {
Some(slot) => slot,
None => {
error!(
self.log,
"Could not read slot clock, dropping unknown block message"
);
return;
}
};
if slot == current_slot {
if let Err(e) =
self.delayed_lookups self.delayed_lookups
.send(SyncMessage::UnknownBlockHashFromAttestation( .send(SyncMessage::UnknownBlockHashFromAttestation(
peer_id, block_hash, peer_id, block_hash,
)) ))
{
warn!(self.log, "Delayed lookups receiver dropped for block referenced by a blob";
"block_root" => block_hash);
}
} else { } else {
self.block_lookups self.block_lookups
.search_block(block_hash, peer_id, &mut self.network) .search_block(block_hash, peer_id, &mut self.network)
@@ -698,17 +732,20 @@ impl<T: BeaconChainTypes> SyncManager<T> {
request_id, request_id,
error, error,
} => self.inject_error(peer_id, request_id, error), } => self.inject_error(peer_id, request_id, error),
SyncMessage::BlockOrBlobProcessed { SyncMessage::BlockPartProcessed {
process_type, process_type,
result, result,
response_type,
} => match process_type { } => match process_type {
BlockProcessType::SingleBlock { id } => { BlockProcessType::SingleBlock { id } => self.block_lookups.single_block_processed(
self.block_lookups id,
.single_block_processed(id, result, &mut self.network) result,
} response_type,
&mut self.network,
),
BlockProcessType::ParentLookup { chain_hash } => self BlockProcessType::ParentLookup { chain_hash } => self
.block_lookups .block_lookups
.parent_block_processed(chain_hash, result, &mut self.network), .parent_block_processed(chain_hash, result, response_type, &mut self.network),
}, },
SyncMessage::BatchProcessed { sync_type, result } => match sync_type { SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => { ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => {
@@ -1011,18 +1048,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
impl<T: EthSpec> From<Result<AvailabilityProcessingStatus, BlockError<T>>> impl<T: EthSpec> From<Result<AvailabilityProcessingStatus, BlockError<T>>>
for BlockOrBlobProcessResult<T> for BlockPartProcessingResult<T>
{ {
fn from(result: Result<AvailabilityProcessingStatus, BlockError<T>>) -> Self { fn from(result: Result<AvailabilityProcessingStatus, BlockError<T>>) -> Self {
match result { match result {
Ok(status) => BlockOrBlobProcessResult::Ok(status), Ok(status) => BlockPartProcessingResult::Ok(status),
Err(e) => BlockOrBlobProcessResult::Err(e), Err(e) => BlockPartProcessingResult::Err(e),
} }
} }
} }
impl<T: EthSpec> From<BlockError<T>> for BlockOrBlobProcessResult<T> { impl<T: EthSpec> From<BlockError<T>> for BlockPartProcessingResult<T> {
fn from(e: BlockError<T>) -> Self { fn from(e: BlockError<T>) -> Self {
BlockOrBlobProcessResult::Err(e) BlockPartProcessingResult::Err(e)
} }
} }