error refactoring

This commit is contained in:
realbigsean
2023-05-02 14:09:53 -04:00
parent 56b2365e17
commit e3f4218624
5 changed files with 58 additions and 69 deletions

View File

@@ -509,7 +509,9 @@ impl<T: EthSpec> TryInto<AvailableBlock<T>> for MaybeAvailableBlock<T> {
fn try_into(self) -> Result<AvailableBlock<T>, Self::Error> { fn try_into(self) -> Result<AvailableBlock<T>, Self::Error> {
match self { match self {
Self::Available(block) => Ok(block), Self::Available(block) => Ok(block),
Self::AvailabilityPending(_) => Err(AvailabilityCheckError::MissingBlobs), Self::AvailabilityPending(block) => Err(AvailabilityCheckError::MissingBlobs(
block.as_block().canonical_root(),
)),
} }
} }
} }

View File

@@ -13,6 +13,7 @@ use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments
use std::collections::hash_map::{Entry, OccupiedEntry}; use std::collections::hash_map::{Entry, OccupiedEntry};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use strum::IntoStaticStr;
use types::beacon_block_body::KzgCommitments; use types::beacon_block_body::KzgCommitments;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS; use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
@@ -21,13 +22,13 @@ use types::{
SignedBeaconBlock, SignedBeaconBlockHeader, Slot, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
}; };
#[derive(Debug)] #[derive(Debug, IntoStaticStr)]
pub enum AvailabilityCheckError { pub enum AvailabilityCheckError {
Kzg(KzgError), Kzg(KzgError),
KzgVerificationFailed, KzgVerificationFailed,
KzgNotInitialized, KzgNotInitialized,
SszTypes(ssz_types::Error), SszTypes(ssz_types::Error),
MissingBlobs, MissingBlobs(Hash256),
NumBlobsMismatch { NumBlobsMismatch {
num_kzg_commitments: usize, num_kzg_commitments: usize,
num_blobs: usize, num_blobs: usize,
@@ -205,7 +206,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
} }
if blob_count < expected_num_blobs { if blob_count < expected_num_blobs {
return Err(AvailabilityCheckError::MissingBlobs); return Err(AvailabilityCheckError::MissingBlobs(block_root));
} }
BlockWrapper::BlockAndBlobs(block, blobs) BlockWrapper::BlockAndBlobs(block, blobs)
@@ -346,6 +347,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
) -> Result<Availability<T>, AvailabilityCheckError> { ) -> Result<Availability<T>, AvailabilityCheckError> {
if occupied_entry.get().has_all_blobs(&executed_block) { if occupied_entry.get().has_all_blobs(&executed_block) {
let num_blobs_expected = executed_block.num_blobs_expected(); let num_blobs_expected = executed_block.num_blobs_expected();
let block_root = executed_block.import_data.block_root;
let AvailabilityPendingExecutedBlock { let AvailabilityPendingExecutedBlock {
block, block,
import_data, import_data,
@@ -360,7 +362,9 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
let verified_blobs = Vec::from(verified_blobs) let verified_blobs = Vec::from(verified_blobs)
.into_iter() .into_iter()
.take(num_blobs_expected) .take(num_blobs_expected)
.map(|maybe_blob| maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs)) .map(|maybe_blob| {
maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs(block_root))
})
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
let available_block = self.make_available(block, verified_blobs)?; let available_block = self.make_available(block, verified_blobs)?;

View File

@@ -26,7 +26,6 @@ use super::{
}; };
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics; use crate::metrics;
use crate::sync::block_lookups::single_block_lookup::LookupVerifyError;
mod parent_lookup; mod parent_lookup;
mod single_block_lookup; mod single_block_lookup;
@@ -315,9 +314,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let should_remove = match request_ref.verify_block(block) { let should_remove = match request_ref.verify_block(block) {
Ok(Some((root, block))) => { Ok(Some((root, block))) => {
if triggered_parent_request { if triggered_parent_request {
// The lookup status here is irrelevant because we wait until the parent chain if let LookupDownloadStatus::AvailabilityCheck(e) =
// is complete before processing the block. request_ref.add_block(root, block)
if let Err(e) = request_ref.add_block(root, block) { {
handle_block_lookup_verify_error( handle_block_lookup_verify_error(
request_id_ref, request_id_ref,
request_ref, request_ref,
@@ -387,9 +386,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let should_remove = match request_ref.verify_blob(blob) { let should_remove = match request_ref.verify_blob(blob) {
Ok(Some((block_root, blobs))) => { Ok(Some((block_root, blobs))) => {
if triggered_parent_request { if triggered_parent_request {
// The lookup status here is irrelevant because we wait until the parent chain if let LookupDownloadStatus::AvailabilityCheck(e) =
// is complete before processing the block. request_ref.add_blobs(block_root, blobs)
if let Err(e) = request_ref.add_blobs(block_root, blobs) { {
handle_block_lookup_verify_error( handle_block_lookup_verify_error(
request_id_ref, request_id_ref,
request_ref, request_ref,
@@ -514,9 +513,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match parent_lookup.verify_block(block, &mut self.failed_chains) { match parent_lookup.verify_block(block, &mut self.failed_chains) {
Ok(Some((block_root, block))) => { Ok(Some((block_root, block))) => {
let process_or_search = parent_lookup.add_block(block_root, block); //TODO(sean) fix let process_or_search = parent_lookup.add_block(block_root, block);
match process_or_search { match process_or_search {
Ok(LookupDownloadStatus::Process(wrapper)) => { LookupDownloadStatus::Process(wrapper) => {
let chain_hash = parent_lookup.chain_hash(); let chain_hash = parent_lookup.chain_hash();
if self if self
.send_block_for_processing( .send_block_for_processing(
@@ -531,7 +530,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.parent_lookups.push(parent_lookup) self.parent_lookups.push(parent_lookup)
} }
} }
Ok(LookupDownloadStatus::SearchBlock(block_root)) => { LookupDownloadStatus::SearchBlock(block_root) => {
if let Some(peer_source) = if let Some(peer_source) =
parent_lookup.peer_source(ResponseType::Block, peer_id) parent_lookup.peer_source(ResponseType::Block, peer_id)
{ {
@@ -541,7 +540,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root); warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root);
} }
} }
Err(e) => {} LookupDownloadStatus::AvailabilityCheck(e) => {}
} }
} }
Ok(None) => { Ok(None) => {
@@ -555,9 +554,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
| ParentVerifyError::ExtraBlocksReturned | ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId | ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::ExtraBlobsReturned | ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_) | ParentVerifyError::InvalidIndex(_) => {
//TODO(sean) treat this differntly?
| ParentVerifyError::AvailabilityCheck(_) => {
let e = e.into(); let e = e.into();
warn!(self.log, "Peer sent invalid response to parent request."; warn!(self.log, "Peer sent invalid response to parent request.";
"peer_id" => %peer_id, "reason" => %e); "peer_id" => %peer_id, "reason" => %e);
@@ -616,7 +613,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match parent_lookup.verify_blob(blob, &mut self.failed_chains) { match parent_lookup.verify_blob(blob, &mut self.failed_chains) {
Ok(Some((block_root, blobs))) => { Ok(Some((block_root, blobs))) => {
let processed_or_search = parent_lookup.add_blobs(block_root, blobs).unwrap(); //TODO(sean) fix let processed_or_search = parent_lookup.add_blobs(block_root, blobs);
match processed_or_search { match processed_or_search {
LookupDownloadStatus::Process(wrapper) => { LookupDownloadStatus::Process(wrapper) => {
let chain_hash = parent_lookup.chain_hash(); let chain_hash = parent_lookup.chain_hash();
@@ -643,6 +640,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root); warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root);
} }
} }
LookupDownloadStatus::AvailabilityCheck(e) => {}
} }
} }
Ok(None) => { Ok(None) => {
@@ -656,9 +654,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
| ParentVerifyError::ExtraBlocksReturned | ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId | ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::ExtraBlobsReturned | ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_) | ParentVerifyError::InvalidIndex(_) => {
//TODO(sean) treat differently?
| ParentVerifyError::AvailabilityCheck(_) => {
let e = e.into(); let e = e.into();
warn!(self.log, "Peer sent invalid response to parent request."; warn!(self.log, "Peer sent invalid response to parent request.";
"peer_id" => %peer_id, "reason" => %e); "peer_id" => %peer_id, "reason" => %e);
@@ -1328,16 +1324,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
} }
fn handle_block_lookup_verify_error<T: BeaconChainTypes>( fn handle_block_lookup_verify_error<T: BeaconChainTypes, Err: Into<&'static str>>(
request_id_ref: &mut u32, request_id_ref: &mut u32,
request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>, request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
response_type: ResponseType, response_type: ResponseType,
peer_id: PeerId, peer_id: PeerId,
error: LookupVerifyError, e: Err,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
log: &Logger, log: &Logger,
) -> ShouldRemoveLookup { ) -> ShouldRemoveLookup {
let msg: &str = error.into(); let msg = e.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
debug!(log, "Single block lookup failed"; debug!(log, "Single block lookup failed";

View File

@@ -7,7 +7,7 @@ use crate::sync::{
}; };
use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
use std::sync::Arc; use std::sync::Arc;
@@ -45,7 +45,6 @@ pub enum ParentVerifyError {
ExtraBlobsReturned, ExtraBlobsReturned,
InvalidIndex(u64), InvalidIndex(u64),
PreviousFailure { parent_root: Hash256 }, PreviousFailure { parent_root: Hash256 },
AvailabilityCheck(String),
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@@ -64,6 +63,19 @@ pub enum RequestError {
pub enum LookupDownloadStatus<T: EthSpec> { pub enum LookupDownloadStatus<T: EthSpec> {
Process(BlockWrapper<T>), Process(BlockWrapper<T>),
SearchBlock(Hash256), SearchBlock(Hash256),
AvailabilityCheck(AvailabilityCheckError),
}
impl<T: EthSpec> From<Result<BlockWrapper<T>, AvailabilityCheckError>> for LookupDownloadStatus<T> {
fn from(value: Result<BlockWrapper<T>, AvailabilityCheckError>) -> Self {
match value {
Ok(wrapper) => LookupDownloadStatus::Process(wrapper),
Err(AvailabilityCheckError::MissingBlobs(block_root)) => {
LookupDownloadStatus::SearchBlock(block_root)
}
Err(e) => LookupDownloadStatus::AvailabilityCheck(e),
}
}
} }
impl<T: BeaconChainTypes> ParentLookup<T> { impl<T: BeaconChainTypes> ParentLookup<T> {
@@ -172,22 +184,18 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> Result<LookupDownloadStatus<T::EthSpec>, ParentVerifyError> { ) -> LookupDownloadStatus<T::EthSpec> {
self.current_parent_request_id = None; self.current_parent_request_id = None;
self.current_parent_request self.current_parent_request.add_block(block_root, block)
.add_block(block_root, block)
.map_err(Into::into)
} }
pub fn add_blobs( pub fn add_blobs(
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>, blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<LookupDownloadStatus<T::EthSpec>, ParentVerifyError> { ) -> LookupDownloadStatus<T::EthSpec> {
self.current_parent_blob_request_id = None; self.current_parent_blob_request_id = None;
self.current_parent_request self.current_parent_request.add_blobs(block_root, blobs)
.add_blobs(block_root, blobs)
.map_err(Into::into)
} }
pub fn pending_block_response(&self, req_id: Id) -> bool { pub fn pending_block_response(&self, req_id: Id) -> bool {
@@ -361,7 +369,6 @@ impl From<LookupVerifyError> for ParentVerifyError {
E::UnrequestedBlobId => ParentVerifyError::UnrequestedBlobId, E::UnrequestedBlobId => ParentVerifyError::UnrequestedBlobId,
E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned, E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned,
E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index), E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index),
E::AvailabilityCheck(e) => ParentVerifyError::AvailabilityCheck(e),
} }
} }
} }

View File

@@ -109,21 +109,15 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
let block_root = blob.block_root; let block_root = blob.block_root;
if let Some(blob_opt) = self.downloaded_blobs.get_mut(blob.index as usize) { if let Some(blob_opt) = self.downloaded_blobs.get_mut(blob.index as usize) {
//TODO(sean) should we log a warn if there is already a downloaded blob?
*blob_opt = Some(blob.clone()); *blob_opt = Some(blob.clone());
if let Some(block) = self.downloaded_block.as_ref() { if let Some(block) = self.downloaded_block.as_ref() {
match self.da_checker.wrap_block( let result = self.da_checker.wrap_block(
block_root, block_root,
block.clone(), block.clone(),
self.downloaded_blobs.clone(), self.downloaded_blobs.clone(),
) { );
Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)), Ok(LookupDownloadStatus::from(result))
Err(AvailabilityCheckError::MissingBlobs) => {
Ok(LookupDownloadStatus::SearchBlock(block_root))
}
Err(e) => Err(LookupVerifyError::AvailabilityCheck(format!("{e:?}"))),
}
} else { } else {
Ok(LookupDownloadStatus::SearchBlock(block_root)) Ok(LookupDownloadStatus::SearchBlock(block_root))
} }
@@ -136,24 +130,19 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>, blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<LookupDownloadStatus<T::EthSpec>, LookupVerifyError> { ) -> LookupDownloadStatus<T::EthSpec> {
for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() { for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() {
if let Some(Some(downloaded_blob)) = blobs.get(index) { if let Some(Some(downloaded_blob)) = blobs.get(index) {
//TODO(sean) should we log a warn if there is already a downloaded blob?
*blob_opt = Some(downloaded_blob.clone()); *blob_opt = Some(downloaded_blob.clone());
} }
} }
if let Some(block) = self.downloaded_block.as_ref() { if let Some(block) = self.downloaded_block.as_ref() {
match self.da_checker.wrap_block(block_root, block.clone(), blobs) { self.da_checker
Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)), .wrap_block(block_root, block.clone(), blobs)
Err(AvailabilityCheckError::MissingBlobs) => { .into()
Ok(LookupDownloadStatus::SearchBlock(block_root))
}
Err(e) => Err(LookupVerifyError::AvailabilityCheck(format!("{e:?}"))),
}
} else { } else {
Ok(LookupDownloadStatus::SearchBlock(block_root)) LookupDownloadStatus::SearchBlock(block_root)
} }
} }
@@ -161,31 +150,22 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> Result<LookupDownloadStatus<T::EthSpec>, LookupVerifyError> { ) -> LookupDownloadStatus<T::EthSpec> {
//TODO(sean) check for existing block?
self.downloaded_block = Some(block.clone()); self.downloaded_block = Some(block.clone());
match self self.da_checker
.da_checker
.wrap_block(block_root, block, self.downloaded_blobs.clone()) .wrap_block(block_root, block, self.downloaded_blobs.clone())
{ .into()
Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)),
Err(AvailabilityCheckError::MissingBlobs) => {
Ok(LookupDownloadStatus::SearchBlock(block_root))
}
Err(e) => Err(LookupVerifyError::AvailabilityCheck(format!("{e:?}"))),
}
} }
pub fn add_block_wrapper( pub fn add_block_wrapper(
&mut self, &mut self,
block_root: Hash256, block_root: Hash256,
block: BlockWrapper<T::EthSpec>, block: BlockWrapper<T::EthSpec>,
) -> Result<LookupDownloadStatus<T::EthSpec>, LookupVerifyError> { ) -> LookupDownloadStatus<T::EthSpec> {
match block { match block {
BlockWrapper::Block(block) => self.add_block(block_root, block), BlockWrapper::Block(block) => self.add_block(block_root, block),
BlockWrapper::BlockAndBlobs(block, blobs) => { BlockWrapper::BlockAndBlobs(block, blobs) => {
//TODO(sean) check for existing block?
self.downloaded_block = Some(block); self.downloaded_block = Some(block);
self.add_blobs(block_root, blobs) self.add_blobs(block_root, blobs)
} }