From 9445ac70d8af36852c69bca62ff9e26697f02647 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 17 Jan 2023 09:53:37 +0100 Subject: [PATCH 01/21] Check data availability boundary in rpc request --- beacon_node/beacon_chain/src/beacon_chain.rs | 33 +++++++-- .../beacon_processor/worker/rpc_methods.rs | 72 ++++++++++++++----- consensus/types/src/signed_beacon_block.rs | 7 +- consensus/types/src/signed_block_and_blobs.rs | 12 +++- 4 files changed, 99 insertions(+), 25 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index caa73401e2..e5bb002d5b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2486,7 +2486,7 @@ impl BeaconChain { while let Some((_root, block)) = filtered_chain_segment.first() { // Determine the epoch of the first block in the remaining segment. - let start_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let start_epoch = block.epoch(); // The `last_index` indicates the position of the first block in an epoch greater // than the current epoch: partitioning the blocks into a run of blocks in the same @@ -2494,9 +2494,7 @@ impl BeaconChain { // the same `BeaconState`. let last_index = filtered_chain_segment .iter() - .position(|(_root, block)| { - block.slot().epoch(T::EthSpec::slots_per_epoch()) > start_epoch - }) + .position(|(_root, block)| block.epoch() > start_epoch) .unwrap_or(filtered_chain_segment.len()); let mut blocks = filtered_chain_segment.split_off(last_index); @@ -3162,7 +3160,7 @@ impl BeaconChain { // Sync aggregate. if let Ok(sync_aggregate) = block.body().sync_aggregate() { // `SyncCommittee` for the sync_aggregate should correspond to the duty slot - let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let duty_epoch = block.epoch(); match self.sync_committee_at_epoch(duty_epoch) { Ok(sync_committee) => { @@ -3429,7 +3427,7 @@ impl BeaconChain { parent_block_slot: Slot, ) { // Do not write to eth1 finalization cache for blocks older than 5 epochs. - if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 5 < current_epoch { + if block.epoch() + 5 < current_epoch { return; } @@ -5860,6 +5858,29 @@ impl BeaconChain { .flatten() } + /// The epoch since which we cater blob data upon a request 'ByRoot'. + /// `None` if the `Eip4844` fork is disabled. + pub fn data_availability_boundary_by_root_rpc_request(&self) -> Option { + self.spec + .eip4844_fork_epoch + .map(|fork_epoch| { + self.epoch().ok().map(|current_epoch| { + vec![ + fork_epoch, + current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), + self.canonical_head + .cached_head() + .finalized_checkpoint() + .epoch, + ] + .into_iter() + .max() + }) + }) + .flatten() + .flatten() + } + /// Returns `true` if we are at or past the `Eip4844` fork. This will always return `false` if /// the `Eip4844` fork is disabled. pub fn is_data_availability_check_required(&self) -> Result { diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index f08ffe1e61..62e6934ff0 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -246,19 +246,39 @@ impl Worker { "request_root" => ?root ); } - Ok((Some(_), None)) => { - debug!( - self.log, - "Peer requested block and blob, but no blob found"; - "peer" => %peer_id, - "request_root" => ?root - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "No blob for requested block".into(), - request_id, - ); + Ok((Some(block), None)) => { + let data_availability_boundary_by_root = self.chain.data_availability_boundary_by_root_rpc_request(); + let block_epoch = block.epoch(); + + if Some(block_epoch) >= data_availability_boundary_by_root { + debug!( + self.log, + "Peer requested block and blob that should be available, but no blob found"; + "peer" => %peer_id, + "request_root" => ?root, + "data_availability_boundary_by_root" => data_availability_boundary_by_root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "No blob for requested block.".into(), + request_id, + ); + } else { + debug!( + self.log, + "Peer requested block and blob older than the data availability boundary for ByRoot request, no blob found"; + "peer" => %peer_id, + "request_root" => ?root, + "data_availability_boundary_by_root" => data_availability_boundary_by_root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + format!("No blob for requested block. Requested blob is older than the data availability boundary for a ByRoot request, currently at epoch {:?}", data_availability_boundary_by_root), + request_id, + ); + } send_response = false; break; } @@ -592,15 +612,33 @@ impl Worker { "start_slot" => req.start_slot, ); + let start_slot = Slot::from(req.start_slot); + let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); + let data_availability_boundary = self.chain.data_availability_boundary(); + + if Some(start_epoch) < data_availability_boundary { + let oldest_blob_slot = self + .chain + .store + .get_blob_info() + .map(|blob_info| blob_info.oldest_blob_slot); + + debug!(self.log, "Range request start slot is older than data availability boundary"; "requested_slot" => req.start_slot, "oldest_known_slot" => ?oldest_blob_slot, "data_availability_boundary" => data_availability_boundary); + + return self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + format!("Requested start slot in epoch {}. Data availability boundary is currently at epoch {:?}", start_epoch, data_availability_boundary), + request_id, + ); + } + // Should not send more than max request blocks if req.count > MAX_REQUEST_BLOBS_SIDECARS { req.count = MAX_REQUEST_BLOBS_SIDECARS; } - let forwards_block_root_iter = match self - .chain - .forwards_iter_block_roots(Slot::from(req.start_slot)) - { + let forwards_block_root_iter = match self.chain.forwards_iter_block_roots(start_slot) { Ok(iter) => iter, Err(BeaconChainError::HistoricalBlockError( HistoricalBlockError::BlockOutOfRange { diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index f57169c72d..89d063365b 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -195,7 +195,7 @@ impl> SignedBeaconBlock } let domain = spec.get_domain( - self.slot().epoch(E::slots_per_epoch()), + self.epoch(), Domain::BeaconProposer, fork, genesis_validators_root, @@ -227,6 +227,11 @@ impl> SignedBeaconBlock self.message().slot() } + /// Convenience accessor for the block's epoch. + pub fn epoch(&self) -> Epoch { + self.message().slot().epoch(E::slots_per_epoch()) + } + /// Convenience accessor for the block's parent root. pub fn parent_root(&self) -> Hash256 { self.message().parent_root() diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index c589fbcfeb..9d8f4f627b 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -1,5 +1,7 @@ use crate::signed_beacon_block::BlobReconstructionError; -use crate::{BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockEip4844, Slot}; +use crate::{ + BlobsSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockEip4844, Slot, +}; use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, DecodeError}; @@ -74,6 +76,14 @@ impl BlockWrapper { } } } + pub fn epoch(&self) -> Epoch { + match &self.0 { + BlockWrapperInner::Block(block) => block.epoch(), + BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { + block_sidecar_pair.beacon_block.epoch() + } + } + } pub fn block(&self) -> &SignedBeaconBlock { match &self.0 { BlockWrapperInner::Block(block) => &block, From b4ec4c1ccf9cad988e34323513644a444a3e31b0 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 18 Jan 2023 14:17:11 +0100 Subject: [PATCH 02/21] Less strict handling of faulty rpc req params and syntax improvement --- .../beacon_processor/worker/rpc_methods.rs | 84 +++++++++++-------- 1 file changed, 51 insertions(+), 33 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 62e6934ff0..61359f031b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -440,7 +440,10 @@ impl Worker { oldest_block_slot, }, )) => { - debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); return self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, @@ -616,46 +619,61 @@ impl Worker { let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); let data_availability_boundary = self.chain.data_availability_boundary(); - if Some(start_epoch) < data_availability_boundary { - let oldest_blob_slot = self - .chain - .store - .get_blob_info() - .map(|blob_info| blob_info.oldest_blob_slot); + let serve_blobs_from_slot = match data_availability_boundary { + Some(data_availability_boundary_epoch) => { + if Some(start_epoch) < data_availability_boundary { + let oldest_blob_slot = self + .chain + .store + .get_blob_info() + .map(|blob_info| blob_info.oldest_blob_slot); - debug!(self.log, "Range request start slot is older than data availability boundary"; "requested_slot" => req.start_slot, "oldest_known_slot" => ?oldest_blob_slot, "data_availability_boundary" => data_availability_boundary); + debug!( + self.log, + "Range request start slot is older than data availability boundary"; + "requested_slot" => req.start_slot, + "oldest_known_slot" => ?oldest_blob_slot, + "data_availability_boundary" => data_availability_boundary + ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - format!("Requested start slot in epoch {}. Data availability boundary is currently at epoch {:?}", start_epoch, data_availability_boundary), - request_id, - ); - } + data_availability_boundary_epoch.start_slot(T::EthSpec::slots_per_epoch()) + } else { + start_slot + } + } + None => { + debug!(self.log, "Eip4844 fork is disabled"); + return; + } + }; // Should not send more than max request blocks if req.count > MAX_REQUEST_BLOBS_SIDECARS { req.count = MAX_REQUEST_BLOBS_SIDECARS; } - let forwards_block_root_iter = match self.chain.forwards_iter_block_roots(start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockError( - HistoricalBlockError::BlockOutOfRange { - slot, - oldest_block_slot, - }, - )) => { - debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), - request_id, - ); - } - Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), - }; + let forwards_block_root_iter = + match self.chain.forwards_iter_block_roots(serve_blobs_from_slot) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockError( + HistoricalBlockError::BlockOutOfRange { + slot, + oldest_block_slot, + }, + )) => { + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); + return self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Backfilling".into(), + request_id, + ); + } + Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + }; // Pick out the required blocks, ignoring skip-slots. let mut last_block_root = req From a00b3558001c4d5aa3728bf87ba3781a7b257d79 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 18 Jan 2023 18:52:18 +0100 Subject: [PATCH 03/21] fixup! Less strict handling of faulty rpc req params and syntax improvement --- beacon_node/beacon_chain/src/beacon_chain.rs | 31 +++++++------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e5bb002d5b..b1f9c8d319 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -5858,27 +5858,18 @@ impl BeaconChain { .flatten() } - /// The epoch since which we cater blob data upon a request 'ByRoot'. + /// The epoch that is a data availability boundary, or the latest finalized epoch. /// `None` if the `Eip4844` fork is disabled. - pub fn data_availability_boundary_by_root_rpc_request(&self) -> Option { - self.spec - .eip4844_fork_epoch - .map(|fork_epoch| { - self.epoch().ok().map(|current_epoch| { - vec![ - fork_epoch, - current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), - self.canonical_head - .cached_head() - .finalized_checkpoint() - .epoch, - ] - .into_iter() - .max() - }) - }) - .flatten() - .flatten() + pub fn finalized_data_availability_boundary(&self) -> Option { + self.data_availability_boundary().map(|boundary| { + std::cmp::max( + boundary, + self.canonical_head + .cached_head() + .finalized_checkpoint() + .epoch, + ) + }) } /// Returns `true` if we are at or past the `Eip4844` fork. This will always return `false` if From 654e59cbba530420d087516c809791d719e7b085 Mon Sep 17 00:00:00 2001 From: Emilia Hane <58548332+emhane@users.noreply.github.com> Date: Wed, 18 Jan 2023 19:40:15 +0100 Subject: [PATCH 04/21] Fix rename fn bug Co-authored-by: realbigsean --- beacon_node/network/src/beacon_processor/worker/rpc_methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 61359f031b..2df13f4ea4 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -247,7 +247,7 @@ impl Worker { ); } Ok((Some(block), None)) => { - let data_availability_boundary_by_root = self.chain.data_availability_boundary_by_root_rpc_request(); + let data_availability_boundary_by_root = self.chain.finalized_data_availability_boundary(); let block_epoch = block.epoch(); if Some(block_epoch) >= data_availability_boundary_by_root { From 9cc25162e2f445a5fecce93481360a4b9d4e6d14 Mon Sep 17 00:00:00 2001 From: Emilia Hane <58548332+emhane@users.noreply.github.com> Date: Wed, 18 Jan 2023 19:48:16 +0100 Subject: [PATCH 05/21] Send error message if eip4844 fork disabled Co-authored-by: realbigsean --- .../network/src/beacon_processor/worker/rpc_methods.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 2df13f4ea4..8a3ce9b8bc 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -643,6 +643,12 @@ impl Worker { } None => { debug!(self.log, "Eip4844 fork is disabled"); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Backfilling".into(), + request_id, + ); return; } }; From 89cb58d17b9ca79b151104e75625e31b685b29bc Mon Sep 17 00:00:00 2001 From: Emilia Hane <58548332+emhane@users.noreply.github.com> Date: Wed, 18 Jan 2023 19:54:35 +0100 Subject: [PATCH 06/21] Fix typo Co-authored-by: realbigsean --- beacon_node/network/src/beacon_processor/worker/rpc_methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 8a3ce9b8bc..b697c0e7bf 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -646,7 +646,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), + "Eip4844 fork is disabled".into(), request_id, ); return; From f7f64eb0078b884bc16ba94a0954efa89ffc8bb8 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 18 Jan 2023 16:27:12 -0500 Subject: [PATCH 07/21] fix/consolidate some error handling --- beacon_node/beacon_chain/src/beacon_chain.rs | 58 ++++++++++++------- .../beacon_processor/worker/rpc_methods.rs | 33 ++++++++--- 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b1f9c8d319..dac3dee8d9 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -971,7 +971,7 @@ impl BeaconChain { } Ok(( self.get_block(block_root).await?.map(Arc::new), - self.get_blobs(block_root).ok().flatten().map(Arc::new), + self.get_blobs(block_root)?.map(Arc::new), )) } @@ -1044,9 +1044,17 @@ impl BeaconChain { /// Returns the blobs at the given root, if any. /// - /// ## Errors + /// Returns `Ok(None)` if the blobs are not found. This could indicate the blob has been pruned + /// or that the block it is referenced by doesn't exist in our database. /// - /// May return a database error. + /// If we can find the corresponding block in our database, we know whether we *should* have + /// blobs. If we should have blobs and no blobs are found, this will error. If we shouldn't, + /// this will reconstruct an empty `BlobsSidecar`. + /// + /// ## Errors + /// - any database read errors + /// - block and blobs are inconsistent in the database + /// - this method is called with a pre-eip4844 block root pub fn get_blobs( &self, block_root: &Hash256, @@ -1054,23 +1062,33 @@ impl BeaconChain { match self.store.get_blobs(block_root)? { Some(blobs) => Ok(Some(blobs)), None => { - if let Ok(Some(block)) = self.get_blinded_block(block_root) { - let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?; - - if expected_kzg_commitments.len() > 0 { - Err(Error::DBInconsistent(format!( - "Expected kzg commitments but no blobs stored for block root {}", - block_root - ))) - } else { - Ok(Some(BlobsSidecar::empty_from_parts( - *block_root, - block.slot(), - ))) - } - } else { - Ok(None) - } + // Check for the corresponding block to understand whether we *should* have blobs. + self + .get_blinded_block(block_root)? + .map(|block| { + // If there are no KZG commitments in the block, we know the sidecar should + // be empty. + let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?; + if expected_kzg_commitments.is_empty() { + Ok(Some(BlobsSidecar::empty_from_parts( + *block_root, + block.slot(), + ))) + } else { + if let Some(boundary) = self.data_availability_boundary() { + // We should have blobs for all blocks after the boundary. + if boundary <= block.epoch() { + return Err(Error::DBInconsistent(format!( + "Expected kzg commitments but no blobs stored for block root {}", + block_root + ))) + } + } + Ok(None) + } + }) + .transpose() + .map(Option::flatten) } } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index b697c0e7bf..6e48389b55 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -247,35 +247,38 @@ impl Worker { ); } Ok((Some(block), None)) => { - let data_availability_boundary_by_root = self.chain.finalized_data_availability_boundary(); + let finalized_data_availability_boundary = self.chain.finalized_data_availability_boundary(); let block_epoch = block.epoch(); - if Some(block_epoch) >= data_availability_boundary_by_root { + if Some(block_epoch) >= finalized_data_availability_boundary { debug!( self.log, "Peer requested block and blob that should be available, but no blob found"; "peer" => %peer_id, "request_root" => ?root, - "data_availability_boundary_by_root" => data_availability_boundary_by_root, + "finalized_data_availability_boundary" => finalized_data_availability_boundary, ); self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "No blob for requested block.".into(), + "Blobs unavailable".into(), request_id, ); + send_response = false; + break; } else { debug!( self.log, - "Peer requested block and blob older than the data availability boundary for ByRoot request, no blob found"; + "Peer requested block and blob older than the data availability \ + boundary for ByRoot request, no blob found"; "peer" => %peer_id, "request_root" => ?root, - "data_availability_boundary_by_root" => data_availability_boundary_by_root, + "finalized_data_availability_boundary" => finalized_data_availability_boundary, ); self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - format!("No blob for requested block. Requested blob is older than the data availability boundary for a ByRoot request, currently at epoch {:?}", data_availability_boundary_by_root), + "Blobs unavailable".into(), request_id, ); } @@ -717,7 +720,7 @@ impl Worker { let block_roots = block_roots.into_iter().flatten().collect::>(); let mut blobs_sent = 0; - let send_response = true; + let mut send_response = true; for root in block_roots { match self.chain.get_blobs(&root) { @@ -735,6 +738,13 @@ impl Worker { "No blobs or block in the store for block root"; "block_root" => ?root ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs unavailable".into(), + request_id, + ); + send_response = false; break; } Err(e) => { @@ -744,6 +754,13 @@ impl Worker { "block_root" => ?root, "error" => ?e ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Failed fetching blobs".into(), + request_id, + ); + send_response = false; break; } } From e1ce4e5b7851f7d18c4d74758d5a3ec213aa794d Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 18 Jan 2023 17:47:32 -0500 Subject: [PATCH 08/21] make explicity BlobsUnavailable error and handle it directly --- beacon_node/beacon_chain/src/beacon_chain.rs | 11 +++---- beacon_node/beacon_chain/src/errors.rs | 1 + .../beacon_processor/worker/rpc_methods.rs | 30 +++++++++++++++++++ 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index dac3dee8d9..a43d82ff32 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1063,12 +1063,12 @@ impl BeaconChain { Some(blobs) => Ok(Some(blobs)), None => { // Check for the corresponding block to understand whether we *should* have blobs. - self - .get_blinded_block(block_root)? + self.get_blinded_block(block_root)? .map(|block| { // If there are no KZG commitments in the block, we know the sidecar should // be empty. - let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?; + let expected_kzg_commitments = + block.message().body().blob_kzg_commitments()?; if expected_kzg_commitments.is_empty() { Ok(Some(BlobsSidecar::empty_from_parts( *block_root, @@ -1078,10 +1078,7 @@ impl BeaconChain { if let Some(boundary) = self.data_availability_boundary() { // We should have blobs for all blocks after the boundary. if boundary <= block.epoch() { - return Err(Error::DBInconsistent(format!( - "Expected kzg commitments but no blobs stored for block root {}", - block_root - ))) + return Err(Error::BlobsUnavailable); } } Ok(None) diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 25f2554f3d..e744e2af5b 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -209,6 +209,7 @@ pub enum BeaconChainError { BlsToExecutionChangeBadFork(ForkName), InconsistentFork(InconsistentFork), ProposerHeadForkChoiceError(fork_choice::Error), + BlobsUnavailable, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 6e48389b55..8888e6f9f7 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -293,6 +293,21 @@ impl Worker { "request_root" => ?root ); } + Err(BeaconChainError::BlobsUnavailable) => { + error!( + self.log, + "No blobs in the store for block root"; + "block_root" => ?root + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs unavailable".into(), + request_id, + ); + send_response = false; + break; + } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { debug!( self.log, @@ -747,6 +762,21 @@ impl Worker { send_response = false; break; } + Err(BeaconChainError::BlobsUnavailable) => { + error!( + self.log, + "No blobs in the store for block root"; + "block_root" => ?root + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs unavailable".into(), + request_id, + ); + send_response = false; + break; + } Err(e) => { error!( self.log, From c6479444c2b87e3c61e21edbd43154e08f6fb43d Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 18 Jan 2023 18:01:46 -0500 Subject: [PATCH 09/21] don't send errors when we *correctly* don't have blobs --- .../beacon_processor/worker/rpc_methods.rs | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 8888e6f9f7..aff0c76511 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -258,14 +258,6 @@ impl Worker { "request_root" => ?root, "finalized_data_availability_boundary" => finalized_data_availability_boundary, ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), - request_id, - ); - send_response = false; - break; } else { debug!( self.log, @@ -275,15 +267,7 @@ impl Worker { "request_root" => ?root, "finalized_data_availability_boundary" => finalized_data_availability_boundary, ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), - request_id, - ); } - send_response = false; - break; } Ok((None, Some(_))) => { debug!( @@ -753,13 +737,6 @@ impl Worker { "No blobs or block in the store for block root"; "block_root" => ?root ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), - request_id, - ); - send_response = false; break; } Err(BeaconChainError::BlobsUnavailable) => { From 8e57eef0ed9f679a75c0dc80824f9064dd935f20 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 18 Jan 2023 18:15:03 -0500 Subject: [PATCH 10/21] return a `BlobsUnavailable` error when the block root is a pre-4844 block --- beacon_node/beacon_chain/src/beacon_chain.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a43d82ff32..99b463e0bc 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1068,7 +1068,10 @@ impl BeaconChain { // If there are no KZG commitments in the block, we know the sidecar should // be empty. let expected_kzg_commitments = - block.message().body().blob_kzg_commitments()?; + match block.message().body().blob_kzg_commitments() { + Ok(kzg_commitments) => kzg_commitments, + Err(_) => return Err(Error::BlobsUnavailable), + }; if expected_kzg_commitments.is_empty() { Ok(Some(BlobsSidecar::empty_from_parts( *block_root, From f7eb89ddd9ef9ed341ec3f7b566b84755de6e0ed Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Fri, 20 Jan 2023 21:16:34 +0100 Subject: [PATCH 11/21] Improve error handling --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 +-- beacon_node/beacon_chain/src/errors.rs | 2 + .../beacon_processor/worker/rpc_methods.rs | 38 +++++++++++++++++-- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 99b463e0bc..aba9be3262 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1070,7 +1070,7 @@ impl BeaconChain { let expected_kzg_commitments = match block.message().body().blob_kzg_commitments() { Ok(kzg_commitments) => kzg_commitments, - Err(_) => return Err(Error::BlobsUnavailable), + Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock), }; if expected_kzg_commitments.is_empty() { Ok(Some(BlobsSidecar::empty_from_parts( @@ -1079,12 +1079,12 @@ impl BeaconChain { ))) } else { if let Some(boundary) = self.data_availability_boundary() { - // We should have blobs for all blocks after the boundary. + // We should have blobs for all blocks younger than the boundary. if boundary <= block.epoch() { return Err(Error::BlobsUnavailable); } } - Ok(None) + Err(Error::BlobsOlderThanDataAvailabilityBoundary) } }) .transpose() diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index e744e2af5b..9a9b09fe1f 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -210,6 +210,8 @@ pub enum BeaconChainError { InconsistentFork(InconsistentFork), ProposerHeadForkChoiceError(fork_choice::Error), BlobsUnavailable, + NoKzgCommitmentsFieldOnBlock, + BlobsOlderThanDataAvailabilityBoundary, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index aff0c76511..441a82c12b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -251,7 +251,7 @@ impl Worker { let block_epoch = block.epoch(); if Some(block_epoch) >= finalized_data_availability_boundary { - debug!( + error!( self.log, "Peer requested block and blob that should be available, but no blob found"; "peer" => %peer_id, @@ -270,7 +270,7 @@ impl Worker { } } Ok((None, Some(_))) => { - debug!( + error!( self.log, "Peer requested block and blob, but no block found"; "peer" => %peer_id, @@ -754,17 +754,47 @@ impl Worker { send_response = false; break; } + Err(BeaconChainError::NoKzgCommitmentsFieldOnBlock) => { + error!( + self.log, + "No kzg_commitments field in block"; + "block_root" => ?root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Failed reading field kzg_commitments from block".into(), + request_id, + ); + send_response = false; + break; + } + Err(BeaconChainError::BlobsOlderThanDataAvailabilityBoundary) => { + error!( + self.log, + "Failed loading blobs older than data availability boundary"; + "block_root" => ?root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs older than data availability boundary".into(), + request_id, + ); + send_response = false; + break; + } Err(e) => { error!( self.log, - "Error fetching blob for peer"; + "Error fetching blinded block for block root"; "block_root" => ?root, "error" => ?e ); self.send_error_response( peer_id, RPCResponseErrorCode::ServerError, - "Failed fetching blobs".into(), + "No blobs and failed fetching corresponding block".into(), request_id, ); send_response = false; From 5fc648217d58d57ea5536a8f602696a8bf8e1e9b Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sat, 21 Jan 2023 14:46:24 +0100 Subject: [PATCH 12/21] fixup! Improve error handling --- .../beacon_processor/worker/rpc_methods.rs | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 441a82c12b..ba16905aa8 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -250,23 +250,37 @@ impl Worker { let finalized_data_availability_boundary = self.chain.finalized_data_availability_boundary(); let block_epoch = block.epoch(); - if Some(block_epoch) >= finalized_data_availability_boundary { - error!( - self.log, - "Peer requested block and blob that should be available, but no blob found"; - "peer" => %peer_id, - "request_root" => ?root, - "finalized_data_availability_boundary" => finalized_data_availability_boundary, - ); - } else { - debug!( - self.log, - "Peer requested block and blob older than the data availability \ - boundary for ByRoot request, no blob found"; - "peer" => %peer_id, - "request_root" => ?root, - "finalized_data_availability_boundary" => finalized_data_availability_boundary, - ); + match finalized_data_availability_boundary { + Some(boundary_epoch) => { + if block_epoch >= finalized_data_availability_boundary { + error!( + self.log, + "Peer requested block and blob that should be available, but no blob found"; + "peer" => %peer_id, + "request_root" => ?root, + "finalized_data_availability_boundary" => finalized_data_availability_boundary, + ); + } else { + debug!( + self.log, + "Peer requested block and blob older than the data availability \ + boundary for ByRoot request, no blob found"; + "peer" => %peer_id, + "request_root" => ?root, + "finalized_data_availability_boundary" => finalized_data_availability_boundary, + ); + } + } + None => { + debug!(self.log, "Eip4844 fork is disabled"); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Eip4844 fork is disabled".into(), + request_id, + ); + return; + } } } Ok((None, Some(_))) => { From f32f08eec0df9b1f7dab1590e255392c85b1127a Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sat, 21 Jan 2023 14:47:14 +0100 Subject: [PATCH 13/21] Fix typo --- .../network/src/beacon_processor/worker/rpc_methods.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index ba16905aa8..00b7038cf1 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -367,7 +367,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; @@ -377,7 +377,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; @@ -390,7 +390,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; @@ -400,7 +400,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; @@ -412,7 +412,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; From 81a754577dcfe337d0ffd23822839738088cf23a Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sat, 21 Jan 2023 15:47:33 +0100 Subject: [PATCH 14/21] fixup! Improve error handling --- .../lighthouse_network/src/rpc/methods.rs | 16 ++- .../beacon_processor/worker/rpc_methods.rs | 113 ++++++++++++------ 2 files changed, 94 insertions(+), 35 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 02e24d8e1d..8ee427da52 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -507,9 +507,23 @@ impl std::fmt::Display for OldBlocksByRangeRequest { } } +impl std::fmt::Display for BlobsByRootRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Request: BlobsByRoot: Number of Requested Roots: {}", + self.block_roots.len() + ) + } +} + impl std::fmt::Display for BlobsByRangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Start Slot: {}, Count: {}", self.start_slot, self.count) + write!( + f, + "Request: BlobsByRange: Start Slot: {}, Count: {}", + self.start_slot, self.count + ) } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 00b7038cf1..b9abf8ea92 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -122,7 +122,10 @@ impl Worker { }; self.send_sync_message(SyncMessage::AddPeer(peer_id, info)); } - Err(e) => error!(self.log, "Could not process status message"; "error" => ?e), + Err(e) => error!(self.log, "Could not process status message"; + "peer" => %peer_id, + "error" => ?e + ), } } @@ -252,13 +255,14 @@ impl Worker { match finalized_data_availability_boundary { Some(boundary_epoch) => { - if block_epoch >= finalized_data_availability_boundary { + if block_epoch >= boundary_epoch { error!( self.log, "Peer requested block and blob that should be available, but no blob found"; + "request" => %request, "peer" => %peer_id, "request_root" => ?root, - "finalized_data_availability_boundary" => finalized_data_availability_boundary, + "finalized_data_availability_boundary" => %boundary_epoch, ); } else { debug!( @@ -267,7 +271,7 @@ impl Worker { boundary for ByRoot request, no blob found"; "peer" => %peer_id, "request_root" => ?root, - "finalized_data_availability_boundary" => finalized_data_availability_boundary, + "finalized_data_availability_boundary" => ?finalized_data_availability_boundary, ); } } @@ -287,6 +291,7 @@ impl Worker { error!( self.log, "Peer requested block and blob, but no block found"; + "request" => %request, "peer" => %peer_id, "request_root" => ?root ); @@ -295,6 +300,8 @@ impl Worker { error!( self.log, "No blobs in the store for block root"; + "request" => %request, + "peer" => %peer_id, "block_root" => ?root ); self.send_error_response( @@ -436,8 +443,8 @@ impl Worker { ) { debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, - "count" => req.count, - "start_slot" => req.start_slot, + "count" => %req.count, + "start_slot" => %req.start_slot, ); // Should not send more than max request blocks @@ -457,8 +464,8 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot + "requested_slot" => %slot, + "oldest_known_slot" => %oldest_block_slot ); return self.send_error_response( peer_id, @@ -467,7 +474,13 @@ impl Worker { request_id, ); } - Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + Err(e) => { + return error!(self.log, "Unable to obtain root iter"; + "request" => %req, + "peer" => %peer_id, + "error" => ?e + ) + } }; // Pick out the required blocks, ignoring skip-slots. @@ -499,7 +512,13 @@ impl Worker { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, - Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), + Err(e) => { + return error!(self.log, "Error during iteration over blocks"; + "request" => %req, + "peer" => %peer_id, + "error" => ?e + ) + } }; // remove all skip slots @@ -531,6 +550,8 @@ impl Worker { error!( self.log, "Block in the chain is not in the store"; + "request" => %req, + "peer" => %peer_id, "request_root" => ?root ); break; @@ -556,6 +577,8 @@ impl Worker { error!( self.log, "Error fetching block for peer"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root, "error" => ?e ); @@ -584,20 +607,20 @@ impl Worker { "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent + "start_slot" => %req.start_slot, + "current_slot" => %current_slot, + "requested" => %req.count, + "returned" => %blocks_sent ); } else { debug!( self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent + "start_slot" => %req.start_slot, + "current_slot" => %current_slot, + "requested" => %req.count, + "returned" => %blocks_sent ); } @@ -627,8 +650,8 @@ impl Worker { ) { debug!(self.log, "Received BlobsByRange Request"; "peer_id" => %peer_id, - "count" => req.count, - "start_slot" => req.start_slot, + "count" => %req.count, + "start_slot" => %req.start_slot, ); let start_slot = Slot::from(req.start_slot); @@ -647,8 +670,8 @@ impl Worker { debug!( self.log, "Range request start slot is older than data availability boundary"; - "requested_slot" => req.start_slot, - "oldest_known_slot" => ?oldest_blob_slot, + "requested_slot" => %req.start_slot, + "oldest_known_slot" => oldest_blob_slot, "data_availability_boundary" => data_availability_boundary ); @@ -684,8 +707,8 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot + "requested_slot" => %slot, + "oldest_known_slot" => %oldest_block_slot ); return self.send_error_response( peer_id, @@ -694,7 +717,13 @@ impl Worker { request_id, ); } - Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + Err(e) => { + return error!(self.log, "Unable to obtain root iter"; + "request" => %req, + "peer" => %peer_id, + "error" => ?e + ) + } }; // Pick out the required blocks, ignoring skip-slots. @@ -726,7 +755,13 @@ impl Worker { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, - Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), + Err(e) => { + return error!(self.log, "Error during iteration over blocks"; + "request" => %req, + "peer" => %peer_id, + "error" => ?e + ) + } }; // remove all skip slots @@ -749,6 +784,8 @@ impl Worker { error!( self.log, "No blobs or block in the store for block root"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root ); break; @@ -757,6 +794,8 @@ impl Worker { error!( self.log, "No blobs in the store for block root"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root ); self.send_error_response( @@ -772,6 +811,8 @@ impl Worker { error!( self.log, "No kzg_commitments field in block"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root, ); self.send_error_response( @@ -787,6 +828,8 @@ impl Worker { error!( self.log, "Failed loading blobs older than data availability boundary"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root, ); self.send_error_response( @@ -802,6 +845,8 @@ impl Worker { error!( self.log, "Error fetching blinded block for block root"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root, "error" => ?e ); @@ -828,20 +873,20 @@ impl Worker { "BlobsByRange Response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blobs", - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent + "start_slot" => %req.start_slot, + "current_slot" => %current_slot, + "requested" => %req.count, + "returned" => %blobs_sent ); } else { debug!( self.log, "BlobsByRange Response processed"; "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent + "start_slot" => %req.start_slot, + "current_slot" => %current_slot, + "requested" => %req.count, + "returned" => %blobs_sent ); } From e14550425d51bd47a67581f37025ab0755925fca Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 23 Jan 2023 13:23:04 +0100 Subject: [PATCH 15/21] Fix mismatched response bug --- beacon_node/network/src/beacon_processor/worker/rpc_methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index b9abf8ea92..f08dac547c 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -351,7 +351,7 @@ impl Worker { // send stream termination if send_response { - self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + self.send_response(peer_id, Response::BlobsByRoot(None), request_id); } drop(send_on_drop); }, From b658cc7aaf6443dff2aafb956dbd218e255ed795 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 24 Jan 2023 10:50:47 +0100 Subject: [PATCH 16/21] simplify checking attester cache for block and blobs. use ResourceUnavailable according to the spec --- beacon_node/beacon_chain/src/beacon_chain.rs | 33 +++--- beacon_node/beacon_chain/src/errors.rs | 2 +- .../src/peer_manager/mod.rs | 1 + .../lighthouse_network/src/rpc/methods.rs | 4 + .../beacon_processor/worker/rpc_methods.rs | 108 +++++++----------- 5 files changed, 66 insertions(+), 82 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index aba9be3262..4d086fcdf5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -957,22 +957,24 @@ impl BeaconChain { &self, block_root: &Hash256, ) -> Result< - ( - Option>>, - Option>>, - ), + Option<( + Arc>, + Arc>, + )>, Error, > { if let (Some(block), Some(blobs)) = ( self.early_attester_cache.get_block(*block_root), self.early_attester_cache.get_blobs(*block_root), ) { - return Ok((Some(block), Some(blobs))); + return Ok(Some((block, blobs))); + } + if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { + let blobs = self.get_blobs(block_root)?.map(Arc::new); + Ok(blobs.map(|blobs| (block, blobs))) + } else { + Ok(None) } - Ok(( - self.get_block(block_root).await?.map(Arc::new), - self.get_blobs(block_root)?.map(Arc::new), - )) } /// Returns the block at the given root, if any. @@ -1044,8 +1046,8 @@ impl BeaconChain { /// Returns the blobs at the given root, if any. /// - /// Returns `Ok(None)` if the blobs are not found. This could indicate the blob has been pruned - /// or that the block it is referenced by doesn't exist in our database. + /// Returns `Ok(None)` if the blobs and associated block are not found. The block referenced by + /// the blob doesn't exist in our database. /// /// If we can find the corresponding block in our database, we know whether we *should* have /// blobs. If we should have blobs and no blobs are found, this will error. If we shouldn't, @@ -1055,6 +1057,7 @@ impl BeaconChain { /// - any database read errors /// - block and blobs are inconsistent in the database /// - this method is called with a pre-eip4844 block root + /// - this method is called for a blob that is beyond the prune depth pub fn get_blobs( &self, block_root: &Hash256, @@ -1073,10 +1076,7 @@ impl BeaconChain { Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock), }; if expected_kzg_commitments.is_empty() { - Ok(Some(BlobsSidecar::empty_from_parts( - *block_root, - block.slot(), - ))) + Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot())) } else { if let Some(boundary) = self.data_availability_boundary() { // We should have blobs for all blocks younger than the boundary. @@ -1084,11 +1084,10 @@ impl BeaconChain { return Err(Error::BlobsUnavailable); } } - Err(Error::BlobsOlderThanDataAvailabilityBoundary) + Err(Error::BlobsOlderThanDataAvailabilityBoundary(block.epoch())) } }) .transpose() - .map(Option::flatten) } } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9a9b09fe1f..fcb2c53a4a 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -211,7 +211,7 @@ pub enum BeaconChainError { ProposerHeadForkChoiceError(fork_choice::Error), BlobsUnavailable, NoKzgCommitmentsFieldOnBlock, - BlobsOlderThanDataAvailabilityBoundary, + BlobsOlderThanDataAvailabilityBoundary(Epoch), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index f0ccc72af7..9176f16f25 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -513,6 +513,7 @@ impl PeerManager { Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, }, + RPCResponseErrorCode::BlobsNotFoundForBlock => PeerAction::LowToleranceError, }, RPCError::SSZDecodeError(_) => PeerAction::Fatal, RPCError::UnsupportedProtocol => { diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 8ee427da52..7563b8f137 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -330,6 +330,7 @@ pub struct LightClientBootstrapRequest { #[strum(serialize_all = "snake_case")] pub enum RPCResponseErrorCode { RateLimited, + BlobsNotFoundForBlock, InvalidRequest, ServerError, /// Error spec'd to indicate that a peer does not have blocks on a requested range. @@ -359,6 +360,7 @@ impl RPCCodedResponse { 2 => RPCResponseErrorCode::ServerError, 3 => RPCResponseErrorCode::ResourceUnavailable, 139 => RPCResponseErrorCode::RateLimited, + 142 => RPCResponseErrorCode::BlobsNotFoundForBlock, _ => RPCResponseErrorCode::Unknown, }; RPCCodedResponse::Error(code, err) @@ -397,6 +399,7 @@ impl RPCResponseErrorCode { RPCResponseErrorCode::ResourceUnavailable => 3, RPCResponseErrorCode::Unknown => 255, RPCResponseErrorCode::RateLimited => 139, + RPCResponseErrorCode::BlobsNotFoundForBlock => 140, } } } @@ -425,6 +428,7 @@ impl std::fmt::Display for RPCResponseErrorCode { RPCResponseErrorCode::ServerError => "Server error occurred", RPCResponseErrorCode::Unknown => "Unknown error occurred", RPCResponseErrorCode::RateLimited => "Rate limited", + RPCResponseErrorCode::BlobsNotFoundForBlock => "No blobs for the given root", }; f.write_str(repr) } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index f08dac547c..d9a3c2eea3 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -230,7 +230,7 @@ impl Worker { .get_block_and_blobs_checking_early_attester_cache(root) .await { - Ok((Some(block), Some(blobs))) => { + Ok(Some((block, blobs))) => { self.send_response( peer_id, Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar { @@ -241,7 +241,7 @@ impl Worker { ); send_block_count += 1; } - Ok((None, None)) => { + Ok(None) => { debug!( self.log, "Peer requested unknown block and blobs"; @@ -249,9 +249,41 @@ impl Worker { "request_root" => ?root ); } - Ok((Some(block), None)) => { + Err(BeaconChainError::BlobsUnavailable) => { + error!( + self.log, + "No blobs in the store for block root"; + "request" => %request, + "peer" => %peer_id, + "block_root" => ?root + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::BlobsNotFoundForBlock, + "Blobs not found for block root".into(), + request_id, + ); + send_response = false; + break; + } + Err(BeaconChainError::NoKzgCommitmentsFieldOnBlock) => { + error!( + self.log, + "No kzg_commitments field in block"; + "peer" => %peer_id, + "block_root" => ?root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Failed reading field kzg_commitments from block".into(), + request_id, + ); + send_response = false; + break; + } + Err(BeaconChainError::BlobsOlderThanDataAvailabilityBoundary(block_epoch)) => { let finalized_data_availability_boundary = self.chain.finalized_data_availability_boundary(); - let block_epoch = block.epoch(); match finalized_data_availability_boundary { Some(boundary_epoch) => { @@ -264,6 +296,14 @@ impl Worker { "request_root" => ?root, "finalized_data_availability_boundary" => %boundary_epoch, ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs older than data availability boundary".into(), + request_id, + ); + send_response = false; + break; } else { debug!( self.log, @@ -287,32 +327,6 @@ impl Worker { } } } - Ok((None, Some(_))) => { - error!( - self.log, - "Peer requested block and blob, but no block found"; - "request" => %request, - "peer" => %peer_id, - "request_root" => ?root - ); - } - Err(BeaconChainError::BlobsUnavailable) => { - error!( - self.log, - "No blobs in the store for block root"; - "request" => %request, - "peer" => %peer_id, - "block_root" => ?root - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), - request_id, - ); - send_response = false; - break; - } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { debug!( self.log, @@ -807,40 +821,6 @@ impl Worker { send_response = false; break; } - Err(BeaconChainError::NoKzgCommitmentsFieldOnBlock) => { - error!( - self.log, - "No kzg_commitments field in block"; - "request" => %req, - "peer" => %peer_id, - "block_root" => ?root, - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Failed reading field kzg_commitments from block".into(), - request_id, - ); - send_response = false; - break; - } - Err(BeaconChainError::BlobsOlderThanDataAvailabilityBoundary) => { - error!( - self.log, - "Failed loading blobs older than data availability boundary"; - "request" => %req, - "peer" => %peer_id, - "block_root" => ?root, - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs older than data availability boundary".into(), - request_id, - ); - send_response = false; - break; - } Err(e) => { error!( self.log, From 5b4cd997d0658e38a9e271e07c2e069c3195c630 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 24 Jan 2023 12:20:40 +0100 Subject: [PATCH 17/21] Update beacon_node/lighthouse_network/src/rpc/methods.rs --- beacon_node/lighthouse_network/src/rpc/methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 7563b8f137..088cf90fa9 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -360,7 +360,7 @@ impl RPCCodedResponse { 2 => RPCResponseErrorCode::ServerError, 3 => RPCResponseErrorCode::ResourceUnavailable, 139 => RPCResponseErrorCode::RateLimited, - 142 => RPCResponseErrorCode::BlobsNotFoundForBlock, + 140 => RPCResponseErrorCode::BlobsNotFoundForBlock, _ => RPCResponseErrorCode::Unknown, }; RPCCodedResponse::Error(code, err) From a4ea1761bb7fcef7a1706e6c7c21855f553a488c Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 24 Jan 2023 12:28:58 +0100 Subject: [PATCH 18/21] Update beacon_node/beacon_chain/src/beacon_chain.rs --- beacon_node/beacon_chain/src/beacon_chain.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4d086fcdf5..268cd87d17 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1046,8 +1046,7 @@ impl BeaconChain { /// Returns the blobs at the given root, if any. /// - /// Returns `Ok(None)` if the blobs and associated block are not found. The block referenced by - /// the blob doesn't exist in our database. + /// Returns `Ok(None)` if the blobs and associated block are not found. /// /// If we can find the corresponding block in our database, we know whether we *should* have /// blobs. If we should have blobs and no blobs are found, this will error. If we shouldn't, From 2225e6ac8946b841c74c8164a58d2db1aa366942 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 24 Jan 2023 14:35:07 +0100 Subject: [PATCH 19/21] pass in data availability boundary to the get_blobs method --- beacon_node/beacon_chain/src/beacon_chain.rs | 41 ++++--- .../beacon_processor/worker/rpc_methods.rs | 113 +++++++----------- 2 files changed, 67 insertions(+), 87 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 268cd87d17..1e775000b1 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -963,15 +963,25 @@ impl BeaconChain { )>, Error, > { - if let (Some(block), Some(blobs)) = ( - self.early_attester_cache.get_block(*block_root), - self.early_attester_cache.get_blobs(*block_root), - ) { - return Ok(Some((block, blobs))); - } - if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { - let blobs = self.get_blobs(block_root)?.map(Arc::new); - Ok(blobs.map(|blobs| (block, blobs))) + // If there is no data availability boundary, the Eip4844 fork is disabled. + if let Some(finalized_data_availability_boundary) = + self.finalized_data_availability_boundary() + { + // Only use the attester cache if we can find both the block and blob + if let (Some(block), Some(blobs)) = ( + self.early_attester_cache.get_block(*block_root), + self.early_attester_cache.get_blobs(*block_root), + ) { + Ok(Some((block, blobs))) + // Attempt to get the block and blobs from the database + } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { + let blobs = self + .get_blobs(block_root, finalized_data_availability_boundary)? + .map(Arc::new); + Ok(blobs.map(|blobs| (block, blobs))) + } else { + Ok(None) + } } else { Ok(None) } @@ -1046,7 +1056,7 @@ impl BeaconChain { /// Returns the blobs at the given root, if any. /// - /// Returns `Ok(None)` if the blobs and associated block are not found. + /// Returns `Ok(None)` if the blobs and associated block are not found. /// /// If we can find the corresponding block in our database, we know whether we *should* have /// blobs. If we should have blobs and no blobs are found, this will error. If we shouldn't, @@ -1060,6 +1070,7 @@ impl BeaconChain { pub fn get_blobs( &self, block_root: &Hash256, + data_availability_boundary: Epoch, ) -> Result>, Error> { match self.store.get_blobs(block_root)? { Some(blobs) => Ok(Some(blobs)), @@ -1076,13 +1087,11 @@ impl BeaconChain { }; if expected_kzg_commitments.is_empty() { Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot())) + } else if data_availability_boundary <= block.epoch() { + // We should have blobs for all blocks younger than the boundary. + Err(Error::BlobsUnavailable) } else { - if let Some(boundary) = self.data_availability_boundary() { - // We should have blobs for all blocks younger than the boundary. - if boundary <= block.epoch() { - return Err(Error::BlobsUnavailable); - } - } + // We shouldn't have blobs for blocks older than the boundary. Err(Error::BlobsOlderThanDataAvailabilityBoundary(block.epoch())) } }) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index d9a3c2eea3..451c31668b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -267,9 +267,9 @@ impl Worker { break; } Err(BeaconChainError::NoKzgCommitmentsFieldOnBlock) => { - error!( + debug!( self.log, - "No kzg_commitments field in block"; + "Peer requested blobs for a pre-eip4844 block"; "peer" => %peer_id, "block_root" => ?root, ); @@ -283,49 +283,22 @@ impl Worker { break; } Err(BeaconChainError::BlobsOlderThanDataAvailabilityBoundary(block_epoch)) => { - let finalized_data_availability_boundary = self.chain.finalized_data_availability_boundary(); - - match finalized_data_availability_boundary { - Some(boundary_epoch) => { - if block_epoch >= boundary_epoch { - error!( - self.log, - "Peer requested block and blob that should be available, but no blob found"; - "request" => %request, - "peer" => %peer_id, - "request_root" => ?root, - "finalized_data_availability_boundary" => %boundary_epoch, - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs older than data availability boundary".into(), - request_id, - ); - send_response = false; - break; - } else { - debug!( - self.log, - "Peer requested block and blob older than the data availability \ - boundary for ByRoot request, no blob found"; - "peer" => %peer_id, - "request_root" => ?root, - "finalized_data_availability_boundary" => ?finalized_data_availability_boundary, - ); - } - } - None => { - debug!(self.log, "Eip4844 fork is disabled"); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Eip4844 fork is disabled".into(), - request_id, - ); - return; - } - } + debug!( + self.log, + "Peer requested block and blobs older than the data availability \ + boundary for ByRoot request, no blob found"; + "peer" => %peer_id, + "request_root" => ?root, + "block_epoch" => ?block_epoch, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs older than data availability boundary".into(), + request_id, + ); + send_response = false; + break; } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { debug!( @@ -670,35 +643,13 @@ impl Worker { let start_slot = Slot::from(req.start_slot); let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); - let data_availability_boundary = self.chain.data_availability_boundary(); - - let serve_blobs_from_slot = match data_availability_boundary { - Some(data_availability_boundary_epoch) => { - if Some(start_epoch) < data_availability_boundary { - let oldest_blob_slot = self - .chain - .store - .get_blob_info() - .map(|blob_info| blob_info.oldest_blob_slot); - - debug!( - self.log, - "Range request start slot is older than data availability boundary"; - "requested_slot" => %req.start_slot, - "oldest_known_slot" => oldest_blob_slot, - "data_availability_boundary" => data_availability_boundary - ); - - data_availability_boundary_epoch.start_slot(T::EthSpec::slots_per_epoch()) - } else { - start_slot - } - } + let data_availability_boundary = match self.chain.data_availability_boundary() { + Some(boundary) => boundary, None => { debug!(self.log, "Eip4844 fork is disabled"); self.send_error_response( peer_id, - RPCResponseErrorCode::ResourceUnavailable, + RPCResponseErrorCode::ServerError, "Eip4844 fork is disabled".into(), request_id, ); @@ -706,6 +657,26 @@ impl Worker { } }; + let serve_blobs_from_slot = if start_epoch < data_availability_boundary { + let oldest_blob_slot = self + .chain + .store + .get_blob_info() + .map(|blob_info| blob_info.oldest_blob_slot); + + debug!( + self.log, + "Range request start slot is older than data availability boundary"; + "requested_slot" => %req.start_slot, + "oldest_known_slot" => oldest_blob_slot, + "data_availability_boundary" => data_availability_boundary + ); + + data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()) + } else { + start_slot + }; + // Should not send more than max request blocks if req.count > MAX_REQUEST_BLOBS_SIDECARS { req.count = MAX_REQUEST_BLOBS_SIDECARS; @@ -785,7 +756,7 @@ impl Worker { let mut send_response = true; for root in block_roots { - match self.chain.get_blobs(&root) { + match self.chain.get_blobs(&root, data_availability_boundary) { Ok(Some(blobs)) => { blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { From 18d4faf61101a0752af2ba1408f4bf4265f77075 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 24 Jan 2023 15:30:29 +0100 Subject: [PATCH 20/21] review updates --- beacon_node/beacon_chain/src/beacon_chain.rs | 18 +-- .../beacon_processor/worker/rpc_methods.rs | 133 +++++++++++------- 2 files changed, 89 insertions(+), 62 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1e775000b1..345a402bd9 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -956,13 +956,7 @@ impl BeaconChain { pub async fn get_block_and_blobs_checking_early_attester_cache( &self, block_root: &Hash256, - ) -> Result< - Option<( - Arc>, - Arc>, - )>, - Error, - > { + ) -> Result>, Error> { // If there is no data availability boundary, the Eip4844 fork is disabled. if let Some(finalized_data_availability_boundary) = self.finalized_data_availability_boundary() @@ -972,13 +966,19 @@ impl BeaconChain { self.early_attester_cache.get_block(*block_root), self.early_attester_cache.get_blobs(*block_root), ) { - Ok(Some((block, blobs))) + Ok(Some(SignedBeaconBlockAndBlobsSidecar { + beacon_block: block, + blobs_sidecar: blobs, + })) // Attempt to get the block and blobs from the database } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { let blobs = self .get_blobs(block_root, finalized_data_availability_boundary)? .map(Arc::new); - Ok(blobs.map(|blobs| (block, blobs))) + Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar { + beacon_block: block, + blobs_sidecar: blobs, + })) } else { Ok(None) } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 451c31668b..106722fd8d 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -14,8 +14,9 @@ use slog::{debug, error}; use slot_clock::SlotClock; use std::sync::Arc; use task_executor::TaskExecutor; +use tokio::count; use types::light_client_bootstrap::LightClientBootstrap; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlockAndBlobsSidecar, Slot}; +use types::{Epoch, EthSpec, Hash256, Slot}; use super::Worker; @@ -198,7 +199,7 @@ impl Worker { "Received BlocksByRoot Request"; "peer" => %peer_id, "requested" => request.block_roots.len(), - "returned" => %send_block_count + "returned" => send_block_count ); // send stream termination @@ -230,13 +231,10 @@ impl Worker { .get_block_and_blobs_checking_early_attester_cache(root) .await { - Ok(Some((block, blobs))) => { + Ok(Some(block_and_blobs)) => { self.send_response( peer_id, - Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })), + Response::BlobsByRoot(Some(block_and_blobs)), request_id, ); send_block_count += 1; @@ -253,7 +251,7 @@ impl Worker { error!( self.log, "No blobs in the store for block root"; - "request" => %request, + "request" => ?request, "peer" => %peer_id, "block_root" => ?root ); @@ -333,7 +331,7 @@ impl Worker { "Received BlobsByRoot Request"; "peer" => %peer_id, "requested" => request.block_roots.len(), - "returned" => %send_block_count + "returned" => send_block_count ); // send stream termination @@ -430,13 +428,18 @@ impl Worker { ) { debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, - "count" => %req.count, - "start_slot" => %req.start_slot, + "count" => req.count, + "start_slot" => req.start_slot, ); // Should not send more than max request blocks if req.count > MAX_REQUEST_BLOCKS { - req.count = MAX_REQUEST_BLOCKS; + return self.send_error_response( + peer_id, + RPCResponseErrorCode::InvalidRequest, + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(), + request_id, + ); } let forwards_block_root_iter = match self @@ -451,8 +454,8 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; - "requested_slot" => %slot, - "oldest_known_slot" => %oldest_block_slot + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot ); return self.send_error_response( peer_id, @@ -463,7 +466,7 @@ impl Worker { } Err(e) => { return error!(self.log, "Unable to obtain root iter"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "error" => ?e ) @@ -501,7 +504,7 @@ impl Worker { Ok(block_roots) => block_roots, Err(e) => { return error!(self.log, "Error during iteration over blocks"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "error" => ?e ) @@ -537,7 +540,7 @@ impl Worker { error!( self.log, "Block in the chain is not in the store"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "request_root" => ?root ); @@ -564,7 +567,7 @@ impl Worker { error!( self.log, "Error fetching block for peer"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "block_root" => ?root, "error" => ?e @@ -594,20 +597,20 @@ impl Worker { "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", - "start_slot" => %req.start_slot, - "current_slot" => %current_slot, - "requested" => %req.count, - "returned" => %blocks_sent + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent ); } else { debug!( self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, - "start_slot" => %req.start_slot, - "current_slot" => %current_slot, - "requested" => %req.count, - "returned" => %blocks_sent + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent ); } @@ -637,12 +640,20 @@ impl Worker { ) { debug!(self.log, "Received BlobsByRange Request"; "peer_id" => %peer_id, - "count" => %req.count, - "start_slot" => %req.start_slot, + "count" => req.count, + "start_slot" => req.start_slot, ); - let start_slot = Slot::from(req.start_slot); - let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); + // Should not send more than max request blocks + if req.count > MAX_REQUEST_BLOBS_SIDECARS { + return self.send_error_response( + peer_id, + RPCResponseErrorCode::InvalidRequest, + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(), + request_id, + ); + } + let data_availability_boundary = match self.chain.data_availability_boundary() { Some(boundary) => boundary, None => { @@ -657,31 +668,47 @@ impl Worker { } }; + let start_slot = Slot::from(req.start_slot); + let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); + + // If the peer requests data from beyond the data availability boundary we altruistically + // cap to the right time range. let serve_blobs_from_slot = if start_epoch < data_availability_boundary { + // Attempt to serve from the earliest block in our database, falling back to the data + // availability boundary let oldest_blob_slot = self .chain .store .get_blob_info() - .map(|blob_info| blob_info.oldest_blob_slot); + .map(|blob_info| blob_info.oldest_blob_slot) + .unwrap_or(data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch())); debug!( self.log, "Range request start slot is older than data availability boundary"; - "requested_slot" => %req.start_slot, + "requested_slot" => req.start_slot, "oldest_known_slot" => oldest_blob_slot, "data_availability_boundary" => data_availability_boundary ); - data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()) + // Check if the request is entirely out of the data availability period. The + // `oldest_blob_slot` is the oldest slot in the database, so includes a margin of error + // controlled by our prune margin. + let end_request_slot = start_slot + count; + if oldest_blob_slot < end_request_slot { + return self.send_error_response( + peer_id, + RPCResponseErrorCode::InvalidRequest, + "Request outside of data availability period".into(), + request_id, + ); + } + std::cmp::max(oldest_blob_slot, start_slot) } else { start_slot }; - // Should not send more than max request blocks - if req.count > MAX_REQUEST_BLOBS_SIDECARS { - req.count = MAX_REQUEST_BLOBS_SIDECARS; - } - + // If the peer requests data from beyond the data availability boundary we altruistically cap to the right time range let forwards_block_root_iter = match self.chain.forwards_iter_block_roots(serve_blobs_from_slot) { Ok(iter) => iter, @@ -692,8 +719,8 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; - "requested_slot" => %slot, - "oldest_known_slot" => %oldest_block_slot + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot ); return self.send_error_response( peer_id, @@ -704,7 +731,7 @@ impl Worker { } Err(e) => { return error!(self.log, "Unable to obtain root iter"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "error" => ?e ) @@ -742,7 +769,7 @@ impl Worker { Ok(block_roots) => block_roots, Err(e) => { return error!(self.log, "Error during iteration over blocks"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "error" => ?e ) @@ -769,7 +796,7 @@ impl Worker { error!( self.log, "No blobs or block in the store for block root"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "block_root" => ?root ); @@ -779,7 +806,7 @@ impl Worker { error!( self.log, "No blobs in the store for block root"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "block_root" => ?root ); @@ -796,7 +823,7 @@ impl Worker { error!( self.log, "Error fetching blinded block for block root"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "block_root" => ?root, "error" => ?e @@ -824,20 +851,20 @@ impl Worker { "BlobsByRange Response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blobs", - "start_slot" => %req.start_slot, - "current_slot" => %current_slot, - "requested" => %req.count, - "returned" => %blobs_sent + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent ); } else { debug!( self.log, "BlobsByRange Response processed"; "peer" => %peer_id, - "start_slot" => %req.start_slot, - "current_slot" => %current_slot, - "requested" => %req.count, - "returned" => %blobs_sent + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent ); } From d3240c1ffb9d9949b2adca9b08a17aedaa8f909c Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 24 Jan 2023 15:42:28 +0100 Subject: [PATCH 21/21] fix common issue across blocks by range and blobs by range --- .../beacon_processor/worker/rpc_methods.rs | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 106722fd8d..0bd4eebcc1 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -14,7 +14,6 @@ use slog::{debug, error}; use slot_clock::SlotClock; use std::sync::Arc; use task_executor::TaskExecutor; -use tokio::count; use types::light_client_bootstrap::LightClientBootstrap; use types::{Epoch, EthSpec, Hash256, Slot}; @@ -424,7 +423,7 @@ impl Worker { send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, - mut req: BlocksByRangeRequest, + req: BlocksByRangeRequest, ) { debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, @@ -465,11 +464,17 @@ impl Worker { ); } Err(e) => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Database error".into(), + request_id, + ); return error!(self.log, "Unable to obtain root iter"; "request" => ?req, "peer" => %peer_id, "error" => ?e - ) + ); } }; @@ -544,6 +549,13 @@ impl Worker { "peer" => %peer_id, "request_root" => ?root ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Database inconsistency".into(), + request_id, + ); + send_response = false; break; } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { @@ -636,7 +648,7 @@ impl Worker { send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, - mut req: BlobsByRangeRequest, + req: BlobsByRangeRequest, ) { debug!(self.log, "Received BlobsByRange Request"; "peer_id" => %peer_id, @@ -694,7 +706,7 @@ impl Worker { // Check if the request is entirely out of the data availability period. The // `oldest_blob_slot` is the oldest slot in the database, so includes a margin of error // controlled by our prune margin. - let end_request_slot = start_slot + count; + let end_request_slot = start_slot + req.count; if oldest_blob_slot < end_request_slot { return self.send_error_response( peer_id, @@ -730,11 +742,17 @@ impl Worker { ); } Err(e) => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Database error".into(), + request_id, + ); return error!(self.log, "Unable to obtain root iter"; "request" => ?req, "peer" => %peer_id, "error" => ?e - ) + ); } }; @@ -800,6 +818,13 @@ impl Worker { "peer" => %peer_id, "block_root" => ?root ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Database inconsistency".into(), + request_id, + ); + send_response = false; break; } Err(BeaconChainError::BlobsUnavailable) => {