mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-17 10:48:28 +00:00
Deprecate blob lookup sync (#9383)
- Extends https://github.com/sigp/lighthouse/pull/9126 to cover blob lookup sync Lookup sync is only for unfinalized blocks, which will never contains blobs in any network we support. Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
This commit is contained in:
@@ -37,7 +37,6 @@ use {
|
||||
};
|
||||
|
||||
pub use sync_methods::ChainSegmentProcessId;
|
||||
use types::data::FixedBlobSidecarList;
|
||||
|
||||
pub type Error<T> = TrySendError<BeaconWorkEvent<T>>;
|
||||
|
||||
@@ -534,31 +533,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for some blobs, where the result from computation (if any) is
|
||||
/// sent to the other side of `result_tx`.
|
||||
pub fn send_rpc_blobs(
|
||||
self: &Arc<Self>,
|
||||
block_root: Hash256,
|
||||
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let blob_count = blobs.iter().filter(|b| b.is_some()).count();
|
||||
if blob_count == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let process_fn = self.clone().generate_rpc_blobs_process_fn(
|
||||
block_root,
|
||||
blobs,
|
||||
seen_timestamp,
|
||||
process_type,
|
||||
);
|
||||
self.try_send(BeaconWorkEvent {
|
||||
drop_during_sync: false,
|
||||
work: Work::RpcBlobs { process_fn },
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for an RPC-fetched payload envelope. `process_lookup_envelope`
|
||||
/// reports the result back to sync.
|
||||
pub fn send_lookup_envelope(
|
||||
|
||||
@@ -24,10 +24,7 @@ use lighthouse_network::service::api_types::CustodyBackfillBatchId;
|
||||
use logging::crit;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use store::KzgCommitment;
|
||||
use tracing::{debug, debug_span, error, info, instrument, warn};
|
||||
use types::data::FixedBlobSidecarList;
|
||||
use types::kzg_ext::format_kzg_commitments;
|
||||
use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256};
|
||||
|
||||
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
|
||||
@@ -241,114 +238,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
drop(handle);
|
||||
}
|
||||
|
||||
/// Returns an async closure which processes a list of blobs received via RPC.
|
||||
///
|
||||
/// This separate function was required to prevent a cycle during compiler
|
||||
/// type checking.
|
||||
pub fn generate_rpc_blobs_process_fn(
|
||||
self: Arc<Self>,
|
||||
block_root: Hash256,
|
||||
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> AsyncFn {
|
||||
let process_fn = async move {
|
||||
self.clone()
|
||||
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
|
||||
.await;
|
||||
};
|
||||
Box::pin(process_fn)
|
||||
}
|
||||
|
||||
/// Attempt to process a list of blobs received from a direct RPC request.
|
||||
#[instrument(
|
||||
name = "lh_process_rpc_blobs",
|
||||
parent = None,
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(?block_root),
|
||||
)]
|
||||
pub async fn process_rpc_blobs(
|
||||
self: Arc<NetworkBeaconProcessor<T>>,
|
||||
block_root: Hash256,
|
||||
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) {
|
||||
let Some(slot) = blobs
|
||||
.iter()
|
||||
.find_map(|blob| blob.as_ref().map(|blob| blob.slot()))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
let (indices, commitments): (Vec<u64>, Vec<KzgCommitment>) = blobs
|
||||
.iter()
|
||||
.filter_map(|blob_opt| {
|
||||
blob_opt
|
||||
.as_ref()
|
||||
.map(|blob| (blob.index, blob.kzg_commitment))
|
||||
})
|
||||
.unzip();
|
||||
let commitments = format_kzg_commitments(&commitments);
|
||||
|
||||
debug!(
|
||||
?indices,
|
||||
%block_root,
|
||||
%slot,
|
||||
commitments,
|
||||
"RPC blobs received"
|
||||
);
|
||||
|
||||
if let Ok(current_slot) = self.chain.slot()
|
||||
&& current_slot == slot
|
||||
{
|
||||
// Note: this metric is useful to gauge how long it takes to receive blobs requested
|
||||
// over rpc. Since we always send the request for block components at `get_unaggregated_attestation_due() / 2`
|
||||
// we can use that as a baseline to measure against.
|
||||
let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock);
|
||||
|
||||
metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay);
|
||||
}
|
||||
|
||||
let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await;
|
||||
register_process_result_metrics(&result, metrics::BlockSource::Rpc, "blobs");
|
||||
|
||||
match &result {
|
||||
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
|
||||
debug!(
|
||||
result = "imported block and blobs",
|
||||
%slot,
|
||||
block_hash = %hash,
|
||||
"Block components retrieved"
|
||||
);
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
}
|
||||
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
|
||||
debug!(
|
||||
block_hash = %block_root,
|
||||
%slot,
|
||||
"Missing components over rpc"
|
||||
);
|
||||
}
|
||||
Err(BlockError::DuplicateFullyImported(_)) => {
|
||||
debug!(
|
||||
block_hash = %block_root,
|
||||
%slot,
|
||||
"Blobs have already been imported"
|
||||
);
|
||||
}
|
||||
// Errors are handled and logged in `block_lookups`
|
||||
Err(_) => {}
|
||||
}
|
||||
|
||||
// Sync handles these results
|
||||
self.send_sync_message(SyncMessage::BlockComponentProcessed {
|
||||
process_type,
|
||||
result: result.into(),
|
||||
});
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "lh_process_rpc_custody_columns",
|
||||
parent = None,
|
||||
|
||||
@@ -41,15 +41,12 @@ use std::iter::Iterator;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use types::data::BlobIdentifier;
|
||||
use types::{
|
||||
AttesterSlashing, BlobSidecar, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch,
|
||||
EthSpec, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, Hash256,
|
||||
MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
|
||||
SignedExecutionPayloadEnvelope, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
|
||||
};
|
||||
use types::{
|
||||
BlobSidecarList,
|
||||
data::{BlobIdentifier, FixedBlobSidecarList},
|
||||
AttesterSlashing, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, EthSpec,
|
||||
ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, Hash256, MainnetEthSpec,
|
||||
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedExecutionPayloadEnvelope,
|
||||
SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
|
||||
};
|
||||
|
||||
type E = MainnetEthSpec;
|
||||
@@ -69,7 +66,6 @@ const STANDARD_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
struct TestRig {
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
next_block: Arc<SignedBeaconBlock<E>>,
|
||||
next_blobs: Option<BlobSidecarList<E>>,
|
||||
next_data_columns: Option<DataColumnSidecarList<E>>,
|
||||
attestations: Vec<(SingleAttestation, SubnetId)>,
|
||||
next_block_attestations: Vec<(SingleAttestation, SubnetId)>,
|
||||
@@ -341,7 +337,7 @@ impl TestRig {
|
||||
|
||||
assert!(beacon_processor.is_ok());
|
||||
let block = next_block_tuple.0;
|
||||
let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
|
||||
let data_columns = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
|
||||
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
|
||||
let kzg = get_kzg(&chain.spec);
|
||||
let epoch = block.slot().epoch(E::slots_per_epoch());
|
||||
@@ -358,20 +354,17 @@ impl TestRig {
|
||||
.filter(|c| sampling_indices.contains(c.index()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
(None, Some(custody_columns))
|
||||
Some(custody_columns)
|
||||
} else {
|
||||
let blob_sidecars =
|
||||
BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap();
|
||||
(Some(blob_sidecars), None)
|
||||
None
|
||||
}
|
||||
} else {
|
||||
(None, None)
|
||||
None
|
||||
};
|
||||
|
||||
Self {
|
||||
chain,
|
||||
next_block: block,
|
||||
next_blobs: blob_sidecars,
|
||||
next_data_columns: data_columns,
|
||||
attestations,
|
||||
next_block_attestations,
|
||||
@@ -448,20 +441,6 @@ impl TestRig {
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn enqueue_single_lookup_rpc_blobs(&self) {
|
||||
if let Some(blobs) = self.next_blobs.clone() {
|
||||
let blobs = FixedBlobSidecarList::new(blobs.into_iter().map(Some).collect::<Vec<_>>());
|
||||
self.network_beacon_processor
|
||||
.send_rpc_blobs(
|
||||
self.next_block.canonical_root(),
|
||||
blobs,
|
||||
std::time::Duration::default(),
|
||||
BlockProcessType::SingleBlob { id: 1 },
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enqueue_single_lookup_rpc_data_columns(&self) {
|
||||
if let Some(data_columns) = self.next_data_columns.clone() {
|
||||
self.network_beacon_processor
|
||||
@@ -1278,7 +1257,6 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
|
||||
);
|
||||
|
||||
// Send the block and ensure that the attestation is received back and imported.
|
||||
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
|
||||
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
|
||||
let mut events = vec![];
|
||||
match import_method {
|
||||
@@ -1293,10 +1271,6 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
|
||||
BlockImportMethod::Rpc => {
|
||||
rig.enqueue_lookup_block();
|
||||
events.push(WorkType::RpcBlock);
|
||||
if num_blobs > 0 {
|
||||
rig.enqueue_single_lookup_rpc_blobs();
|
||||
events.push(WorkType::RpcBlobs);
|
||||
}
|
||||
if num_data_columns > 0 {
|
||||
rig.enqueue_single_lookup_rpc_data_columns();
|
||||
events.push(WorkType::RpcCustodyColumn);
|
||||
@@ -1360,7 +1334,6 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
|
||||
);
|
||||
|
||||
// Send the block and ensure that the attestation is received back and imported.
|
||||
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
|
||||
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
|
||||
let mut events = vec![];
|
||||
match import_method {
|
||||
@@ -1375,10 +1348,6 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
|
||||
BlockImportMethod::Rpc => {
|
||||
rig.enqueue_lookup_block();
|
||||
events.push(WorkType::RpcBlock);
|
||||
if num_blobs > 0 {
|
||||
rig.enqueue_single_lookup_rpc_blobs();
|
||||
events.push(WorkType::RpcBlobs);
|
||||
}
|
||||
if num_data_columns > 0 {
|
||||
rig.enqueue_single_lookup_rpc_data_columns();
|
||||
events.push(WorkType::RpcCustodyColumn);
|
||||
@@ -1565,19 +1534,13 @@ async fn import_misc_gossip_ops() {
|
||||
async fn test_rpc_block_reprocessing() {
|
||||
let mut rig = TestRig::new(SMALL_CHAIN).await;
|
||||
let next_block_root = rig.next_block.canonical_root();
|
||||
|
||||
// Insert the next block into the duplicate cache manually
|
||||
let handle = rig.duplicate_cache.check_and_insert(next_block_root);
|
||||
rig.enqueue_single_lookup_block();
|
||||
rig.assert_event_journal_completes(&[WorkType::RpcBlock])
|
||||
.await;
|
||||
|
||||
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
|
||||
if num_blobs > 0 {
|
||||
rig.enqueue_single_lookup_rpc_blobs();
|
||||
rig.assert_event_journal_completes(&[WorkType::RpcBlobs])
|
||||
.await;
|
||||
}
|
||||
|
||||
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
|
||||
if num_data_columns > 0 {
|
||||
rig.enqueue_single_lookup_rpc_data_columns();
|
||||
|
||||
@@ -340,8 +340,8 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
Response::BlobsByRange(blob) => {
|
||||
self.on_blobs_by_range_response(peer_id, app_request_id, blob);
|
||||
}
|
||||
Response::BlobsByRoot(blob) => {
|
||||
self.on_blobs_by_root_response(peer_id, app_request_id, blob);
|
||||
Response::BlobsByRoot(_) => {
|
||||
crit!(%peer_id, "Unexpected BlobsByRoot response; lookup blob requests removed");
|
||||
}
|
||||
Response::DataColumnsByRoot(data_column) => {
|
||||
self.on_data_columns_by_root_response(peer_id, app_request_id, data_column);
|
||||
@@ -721,40 +721,6 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a `BlobsByRoot` response from the peer.
|
||||
pub fn on_blobs_by_root_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
app_request_id: AppRequestId,
|
||||
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
let sync_request_id = match app_request_id {
|
||||
AppRequestId::Sync(sync_id) => match sync_id {
|
||||
id @ SyncRequestId::SingleBlob { .. } => id,
|
||||
other => {
|
||||
crit!(request = ?other, "BlobsByRoot response on incorrect request");
|
||||
return;
|
||||
}
|
||||
},
|
||||
AppRequestId::Router => {
|
||||
crit!(%peer_id, "All BlobsByRoot requests belong to sync");
|
||||
return;
|
||||
}
|
||||
AppRequestId::Internal => unreachable!("Handled internally"),
|
||||
};
|
||||
|
||||
trace!(
|
||||
%peer_id,
|
||||
"Received BlobsByRoot Response"
|
||||
);
|
||||
self.send_to_sync(SyncMessage::RpcBlob {
|
||||
sync_request_id,
|
||||
peer_id,
|
||||
blob_sidecar,
|
||||
seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(),
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a `DataColumnsByRoot` response from the peer.
|
||||
pub fn on_data_columns_by_root_response(
|
||||
&mut self,
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use crate::sync::block_lookups::single_block_lookup::{
|
||||
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
|
||||
};
|
||||
use crate::sync::block_lookups::{
|
||||
BlobRequestState, BlockRequestState, CustodyRequestState, PeerId,
|
||||
};
|
||||
use crate::sync::block_lookups::{BlockRequestState, CustodyRequestState, PeerId};
|
||||
use crate::sync::manager::BlockProcessType;
|
||||
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
@@ -11,7 +9,6 @@ use lighthouse_network::service::api_types::Id;
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use types::data::FixedBlobSidecarList;
|
||||
use types::{DataColumnSidecarList, SignedBeaconBlock};
|
||||
|
||||
use super::SingleLookupId;
|
||||
@@ -20,17 +17,16 @@ use super::single_block_lookup::{ComponentRequests, DownloadResult};
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum ResponseType {
|
||||
Block,
|
||||
Blob,
|
||||
CustodyColumn,
|
||||
}
|
||||
|
||||
/// This trait unifies common single block lookup functionality across blocks and blobs. This
|
||||
/// includes making requests, verifying responses, and handling processing results. A
|
||||
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is
|
||||
/// implemented for each.
|
||||
/// This trait unifies common single block lookup functionality across blocks and data columns.
|
||||
/// This includes making requests, verifying responses, and handling processing results. A
|
||||
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `CustodyRequestState`, this trait
|
||||
/// is implemented for each.
|
||||
///
|
||||
/// The use of the `ResponseType` associated type gives us a degree of type
|
||||
/// safety when handling a block/blob response ensuring we only mutate the correct corresponding
|
||||
/// safety when handling a block/column response ensuring we only mutate the correct corresponding
|
||||
/// state.
|
||||
pub trait RequestState<T: BeaconChainTypes> {
|
||||
/// The type created after validation.
|
||||
@@ -61,7 +57,7 @@ pub trait RequestState<T: BeaconChainTypes> {
|
||||
/// Returns the `ResponseType` associated with this trait implementation. Useful in logging.
|
||||
fn response_type() -> ResponseType;
|
||||
|
||||
/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
|
||||
/// A getter for the `BlockRequestState` or `CustodyRequestState` associated with this trait.
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str>;
|
||||
|
||||
/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
|
||||
@@ -114,54 +110,6 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
|
||||
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
|
||||
|
||||
fn make_request(
|
||||
&self,
|
||||
id: Id,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
expected_blobs: usize,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupRequestResult, LookupRequestError> {
|
||||
cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs)
|
||||
.map_err(LookupRequestError::SendFailedNetwork)
|
||||
}
|
||||
|
||||
fn send_for_processing(
|
||||
id: Id,
|
||||
download_result: DownloadResult<Self::VerifiedResponseType>,
|
||||
cx: &SyncNetworkContext<T>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let DownloadResult {
|
||||
value,
|
||||
block_root,
|
||||
seen_timestamp,
|
||||
..
|
||||
} = download_result;
|
||||
cx.send_blobs_for_processing(id, block_root, value, seen_timestamp)
|
||||
.map_err(LookupRequestError::SendFailedProcessor)
|
||||
}
|
||||
|
||||
fn response_type() -> ResponseType {
|
||||
ResponseType::Blob
|
||||
}
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||
match &mut request.component_requests {
|
||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
|
||||
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
|
||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||
}
|
||||
}
|
||||
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&self.state
|
||||
}
|
||||
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&mut self.state
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
||||
type VerifiedResponseType = DataColumnSidecarList<T::EthSpec>;
|
||||
|
||||
@@ -203,7 +151,6 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||
match &mut request.component_requests {
|
||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
|
||||
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
|
||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ use fnv::FnvHashMap;
|
||||
use lighthouse_network::service::api_types::SingleLookupReqId;
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
use lru_cache::LRUTimeCache;
|
||||
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState};
|
||||
pub use single_block_lookup::{BlockRequestState, CustodyRequestState};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -550,9 +550,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
BlockProcessType::SingleBlock { id } => {
|
||||
self.on_processing_result_inner::<BlockRequestState<T::EthSpec>>(id, result, cx)
|
||||
}
|
||||
BlockProcessType::SingleBlob { id } => {
|
||||
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
|
||||
}
|
||||
BlockProcessType::SingleCustodyColumn(id) => {
|
||||
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
|
||||
}
|
||||
@@ -696,7 +693,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
PeerAction::MidToleranceError,
|
||||
match R::response_type() {
|
||||
ResponseType::Block => "lookup_block_processing_failure",
|
||||
ResponseType::Blob => "lookup_blobs_processing_failure",
|
||||
ResponseType::CustodyColumn => {
|
||||
"lookup_custody_column_processing_failure"
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ use std::time::{Duration, Instant};
|
||||
use store::Hash256;
|
||||
use strum::IntoStaticStr;
|
||||
use tracing::{Span, debug_span};
|
||||
use types::data::FixedBlobSidecarList;
|
||||
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};
|
||||
|
||||
// Dedicated enum for LookupResult to force its usage
|
||||
@@ -77,7 +76,6 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ComponentRequests<E: EthSpec> {
|
||||
WaitingForBlock,
|
||||
ActiveBlobRequest(BlobRequestState<E>, usize),
|
||||
ActiveCustodyRequest(CustodyRequestState<E>),
|
||||
// When printing in debug this state display the reason why it's not needed
|
||||
#[allow(dead_code)]
|
||||
@@ -176,7 +174,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
self.block_request_state.state.is_processed()
|
||||
&& match &self.component_requests {
|
||||
ComponentRequests::WaitingForBlock => false,
|
||||
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
|
||||
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
|
||||
ComponentRequests::NotNeeded { .. } => true,
|
||||
}
|
||||
@@ -191,9 +188,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
// check if the`block_request_state.state.is_awaiting_event(). However we already
|
||||
// checked that above, so `WaitingForBlock => false` is equivalent.
|
||||
ComponentRequests::WaitingForBlock => false,
|
||||
ComponentRequests::ActiveBlobRequest(request, _) => {
|
||||
request.state.is_awaiting_event()
|
||||
}
|
||||
ComponentRequests::ActiveCustodyRequest(request) => {
|
||||
request.state.is_awaiting_event()
|
||||
}
|
||||
@@ -232,11 +226,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
|
||||
if expected_blobs == 0 {
|
||||
self.component_requests = ComponentRequests::NotNeeded("no data");
|
||||
} else if cx.chain.should_fetch_blobs(block_epoch) {
|
||||
self.component_requests = ComponentRequests::ActiveBlobRequest(
|
||||
BlobRequestState::new(self.block_root),
|
||||
expected_blobs,
|
||||
);
|
||||
} else if cx.chain.should_fetch_custody_columns(block_epoch) {
|
||||
self.component_requests = ComponentRequests::ActiveCustodyRequest(
|
||||
CustodyRequestState::new(self.block_root, block.slot()),
|
||||
@@ -260,9 +249,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
|
||||
match &self.component_requests {
|
||||
ComponentRequests::WaitingForBlock => {} // do nothing
|
||||
ComponentRequests::ActiveBlobRequest(_, expected_blobs) => {
|
||||
self.continue_request::<BlobRequestState<T::EthSpec>>(cx, *expected_blobs)?
|
||||
}
|
||||
ComponentRequests::ActiveCustodyRequest(_) => {
|
||||
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
|
||||
}
|
||||
@@ -373,24 +359,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of the blob request component of a `SingleBlockLookup`.
|
||||
#[derive(Educe)]
|
||||
#[educe(Debug)]
|
||||
pub struct BlobRequestState<E: EthSpec> {
|
||||
#[educe(Debug(ignore))]
|
||||
pub block_root: Hash256,
|
||||
pub state: SingleLookupRequestState<FixedBlobSidecarList<E>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> BlobRequestState<E> {
|
||||
pub fn new(block_root: Hash256) -> Self {
|
||||
Self {
|
||||
block_root,
|
||||
state: SingleLookupRequestState::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of the custody request component of a `SingleBlockLookup`.
|
||||
#[derive(Educe)]
|
||||
#[educe(Debug)]
|
||||
|
||||
@@ -607,11 +607,14 @@ mod tests {
|
||||
|
||||
let mut spec = test_spec::<E>();
|
||||
spec.deneb_fork_epoch = Some(Epoch::new(0));
|
||||
// Pin to pre-PeerDAS so this exercises the blob (not custody-column) path under any
|
||||
// FORK_NAME.
|
||||
spec.fulu_fork_epoch = None;
|
||||
let spec = Arc::new(spec);
|
||||
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
|
||||
// Assert response is finished and RpcBlocks cannot be constructed, because blobs weren't returned.
|
||||
// Blobs are no longer required for availability, so the response succeeds without them.
|
||||
let result = info.responses(da_checker, spec).unwrap();
|
||||
assert!(result.is_err())
|
||||
assert!(result.is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -44,7 +44,7 @@ use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProces
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::block_lookups::{
|
||||
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
|
||||
BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
|
||||
};
|
||||
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
|
||||
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
|
||||
@@ -197,7 +197,6 @@ pub enum SyncMessage<E: EthSpec> {
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum BlockProcessType {
|
||||
SingleBlock { id: Id },
|
||||
SingleBlob { id: Id },
|
||||
SingleCustodyColumn(Id),
|
||||
SinglePayloadEnvelope(Id),
|
||||
}
|
||||
@@ -206,7 +205,6 @@ impl BlockProcessType {
|
||||
pub fn id(&self) -> Id {
|
||||
match self {
|
||||
BlockProcessType::SingleBlock { id }
|
||||
| BlockProcessType::SingleBlob { id }
|
||||
| BlockProcessType::SingleCustodyColumn(id)
|
||||
| BlockProcessType::SinglePayloadEnvelope(id) => *id,
|
||||
}
|
||||
@@ -507,9 +505,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
SyncRequestId::SingleBlock { id } => {
|
||||
self.on_single_block_response(id, peer_id, RpcEvent::RPCError(error))
|
||||
}
|
||||
SyncRequestId::SingleBlob { id } => {
|
||||
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
|
||||
}
|
||||
SyncRequestId::SinglePayloadEnvelope { id } => {
|
||||
self.on_single_payload_envelope_response(id, peer_id, RpcEvent::RPCError(error))
|
||||
}
|
||||
@@ -1197,11 +1192,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
seen_timestamp: Duration,
|
||||
) {
|
||||
match sync_request_id {
|
||||
SyncRequestId::SingleBlob { id } => self.on_single_blob_response(
|
||||
id,
|
||||
peer_id,
|
||||
RpcEvent::from_chunk(blob, seen_timestamp),
|
||||
),
|
||||
SyncRequestId::BlobsByRange(id) => self.on_blobs_by_range_response(
|
||||
id,
|
||||
peer_id,
|
||||
@@ -1278,24 +1268,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn on_single_blob_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
peer_id: PeerId,
|
||||
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
) {
|
||||
if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) {
|
||||
self.block_lookups
|
||||
.on_download_response::<BlobRequestState<T::EthSpec>>(
|
||||
id,
|
||||
resp.map(|(value, seen_timestamp)| {
|
||||
(value, PeerGroup::from_single(peer_id), seen_timestamp)
|
||||
}),
|
||||
&mut self.network,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn on_data_columns_by_root_response(
|
||||
&mut self,
|
||||
req_id: DataColumnsByRootRequestId,
|
||||
|
||||
@@ -18,7 +18,6 @@ use crate::status::ToStatusMessage;
|
||||
use crate::sync::batch::ByRangeRequestType;
|
||||
use crate::sync::block_lookups::SingleLookupId;
|
||||
use crate::sync::block_sidecar_coupling::CouplingError;
|
||||
use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest;
|
||||
use crate::sync::range_data_column_batch_request::RangeDataColumnBatchRequest;
|
||||
use beacon_chain::block_verification_types::LookupBlock;
|
||||
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
|
||||
@@ -38,8 +37,8 @@ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSourc
|
||||
use parking_lot::RwLock;
|
||||
pub use requests::LookupVerifyError;
|
||||
use requests::{
|
||||
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
|
||||
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
|
||||
ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems,
|
||||
DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
|
||||
PayloadEnvelopesByRootRequestItems,
|
||||
};
|
||||
#[cfg(test)]
|
||||
@@ -53,7 +52,6 @@ use std::time::Duration;
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{Span, debug, debug_span, error, warn};
|
||||
use types::data::FixedBlobSidecarList;
|
||||
use types::{
|
||||
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
|
||||
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||
@@ -203,8 +201,6 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
||||
/// A mapping of active BlocksByRoot requests, including both current slot and parent lookups.
|
||||
blocks_by_root_requests:
|
||||
ActiveRequests<SingleLookupReqId, BlocksByRootRequestItems<T::EthSpec>>,
|
||||
/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
|
||||
blobs_by_root_requests: ActiveRequests<SingleLookupReqId, BlobsByRootRequestItems<T::EthSpec>>,
|
||||
/// A mapping of active PayloadEnvelopesByRoot requests
|
||||
payload_envelopes_by_root_requests:
|
||||
ActiveRequests<SingleLookupReqId, PayloadEnvelopesByRootRequestItems<T::EthSpec>>,
|
||||
@@ -300,7 +296,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
execution_engine_state: EngineState::Online, // always assume `Online` at the start
|
||||
request_id: 1,
|
||||
blocks_by_root_requests: ActiveRequests::new("blocks_by_root"),
|
||||
blobs_by_root_requests: ActiveRequests::new("blobs_by_root"),
|
||||
payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"),
|
||||
data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"),
|
||||
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
|
||||
@@ -329,7 +324,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
network_send: _,
|
||||
request_id: _,
|
||||
blocks_by_root_requests,
|
||||
blobs_by_root_requests,
|
||||
payload_envelopes_by_root_requests,
|
||||
data_columns_by_root_requests,
|
||||
blocks_by_range_requests,
|
||||
@@ -350,10 +344,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.active_requests_of_peer(peer_id)
|
||||
.into_iter()
|
||||
.map(|id| SyncRequestId::SingleBlock { id: *id });
|
||||
let blobs_by_root_ids = blobs_by_root_requests
|
||||
.active_requests_of_peer(peer_id)
|
||||
.into_iter()
|
||||
.map(|id| SyncRequestId::SingleBlob { id: *id });
|
||||
let payload_envelopes_by_root_ids = payload_envelopes_by_root_requests
|
||||
.active_requests_of_peer(peer_id)
|
||||
.into_iter()
|
||||
@@ -375,7 +365,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
.into_iter()
|
||||
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
|
||||
blocks_by_root_ids
|
||||
.chain(blobs_by_root_ids)
|
||||
.chain(payload_envelopes_by_root_ids)
|
||||
.chain(data_column_by_root_ids)
|
||||
.chain(blocks_by_range_ids)
|
||||
@@ -432,7 +421,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
network_send: _,
|
||||
request_id: _,
|
||||
blocks_by_root_requests,
|
||||
blobs_by_root_requests,
|
||||
payload_envelopes_by_root_requests,
|
||||
data_columns_by_root_requests,
|
||||
blocks_by_range_requests,
|
||||
@@ -455,7 +443,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
|
||||
for peer_id in blocks_by_root_requests
|
||||
.iter_request_peers()
|
||||
.chain(blobs_by_root_requests.iter_request_peers())
|
||||
.chain(payload_envelopes_by_root_requests.iter_request_peers())
|
||||
.chain(data_columns_by_root_requests.iter_request_peers())
|
||||
.chain(blocks_by_range_requests.iter_request_peers())
|
||||
@@ -1017,109 +1004,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
|
||||
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||
}
|
||||
|
||||
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
|
||||
/// - If we have a downloaded but not yet processed block
|
||||
/// - If the da_checker has a pending block
|
||||
/// - If the da_checker has pending blobs from gossip
|
||||
///
|
||||
/// Returns false if no request was made, because we don't need to import (more) blobs.
|
||||
pub fn blob_lookup_request(
|
||||
&mut self,
|
||||
lookup_id: SingleLookupId,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
block_root: Hash256,
|
||||
expected_blobs: usize,
|
||||
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||
let active_request_count_by_peer = self.active_request_count_by_peer();
|
||||
let Some(peer_id) = lookup_peers
|
||||
.read()
|
||||
.iter()
|
||||
.map(|peer| {
|
||||
(
|
||||
// Prefer peers with less overall requests
|
||||
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
|
||||
// Random factor to break ties, otherwise the PeerID breaks ties
|
||||
rand::random::<u32>(),
|
||||
peer,
|
||||
)
|
||||
})
|
||||
.min()
|
||||
.map(|(_, _, peer)| *peer)
|
||||
else {
|
||||
// Allow lookup to not have any peers and do nothing. This is an optimization to not
|
||||
// lose progress of lookups created from a block with unknown parent before we receive
|
||||
// attestations for said block.
|
||||
// Lookup sync event safety: If a lookup requires peers to make progress, and does
|
||||
// not receive any new peers for some time it will be dropped. If it receives a new
|
||||
// peer it must attempt to make progress.
|
||||
return Ok(LookupRequestResult::Pending("no peers"));
|
||||
};
|
||||
|
||||
let imported_blob_indexes = self
|
||||
.chain
|
||||
.data_availability_checker
|
||||
.cached_blob_indexes(&block_root)
|
||||
.unwrap_or_default();
|
||||
// Include only the blob indexes not yet imported (received through gossip)
|
||||
let indices = (0..expected_blobs as u64)
|
||||
.filter(|index| !imported_blob_indexes.contains(index))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if indices.is_empty() {
|
||||
// No blobs required, do not issue any request
|
||||
return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch"));
|
||||
}
|
||||
|
||||
let id = SingleLookupReqId {
|
||||
lookup_id,
|
||||
req_id: self.next_id(),
|
||||
};
|
||||
|
||||
let request = BlobsByRootSingleBlockRequest {
|
||||
block_root,
|
||||
indices: indices.clone(),
|
||||
};
|
||||
|
||||
// Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call
|
||||
let network_request = RequestType::BlobsByRoot(
|
||||
request
|
||||
.clone()
|
||||
.into_request(&self.fork_context)
|
||||
.map_err(RpcRequestSendError::InternalError)?,
|
||||
);
|
||||
self.network_send
|
||||
.send(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request: network_request,
|
||||
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
|
||||
})
|
||||
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
|
||||
|
||||
debug!(
|
||||
method = "BlobsByRoot",
|
||||
?block_root,
|
||||
blob_indices = ?indices,
|
||||
peer = %peer_id,
|
||||
%id,
|
||||
"Sync RPC request sent"
|
||||
);
|
||||
|
||||
self.blobs_by_root_requests.insert(
|
||||
id,
|
||||
peer_id,
|
||||
// true = enforce max_requests are returned for blobs_by_root. We only issue requests for
|
||||
// blocks after we know the block has data, and only request peers after they claim to
|
||||
// have imported the block+blobs.
|
||||
true,
|
||||
BlobsByRootRequestItems::new(request),
|
||||
// Not implemented
|
||||
Span::none(),
|
||||
);
|
||||
|
||||
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||
}
|
||||
|
||||
/// Request to send a single `data_columns_by_root` request to the network.
|
||||
pub fn data_column_lookup_request(
|
||||
&mut self,
|
||||
@@ -1522,35 +1406,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
self.on_rpc_response_result(resp, peer_id)
|
||||
}
|
||||
|
||||
pub(crate) fn on_single_blob_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
peer_id: PeerId,
|
||||
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
|
||||
let resp = self.blobs_by_root_requests.on_response(id, rpc_event);
|
||||
let resp = resp.map(|res| {
|
||||
res.and_then(|(blobs, seen_timestamp)| {
|
||||
if let Some(max_len) = blobs
|
||||
.first()
|
||||
.map(|blob| self.chain.spec.max_blobs_per_block(blob.epoch()) as usize)
|
||||
{
|
||||
match to_fixed_blob_sidecar_list(blobs, max_len) {
|
||||
Ok(blobs) => Ok((blobs, seen_timestamp)),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
} else {
|
||||
Err(RpcResponseError::VerifyError(
|
||||
LookupVerifyError::InternalError(
|
||||
"Requested blobs for a block that has no blobs".to_string(),
|
||||
),
|
||||
))
|
||||
}
|
||||
})
|
||||
});
|
||||
self.on_rpc_response_result(resp, peer_id)
|
||||
}
|
||||
|
||||
pub(crate) fn on_single_payload_envelope_response(
|
||||
&mut self,
|
||||
id: SingleLookupReqId,
|
||||
@@ -1718,36 +1573,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn send_blobs_for_processing(
|
||||
&self,
|
||||
id: Id,
|
||||
block_root: Hash256,
|
||||
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||
seen_timestamp: Duration,
|
||||
) -> Result<(), SendErrorProcessor> {
|
||||
let beacon_processor = self
|
||||
.beacon_processor_if_enabled()
|
||||
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
|
||||
|
||||
debug!(?block_root, ?id, "Sending blobs for processing");
|
||||
// Lookup sync event safety: If `beacon_processor.send_rpc_blobs` returns Ok() sync
|
||||
// must receive a single `SyncMessage::BlockComponentProcessed` event with this process type
|
||||
beacon_processor
|
||||
.send_rpc_blobs(
|
||||
block_root,
|
||||
blobs,
|
||||
seen_timestamp,
|
||||
BlockProcessType::SingleBlob { id },
|
||||
)
|
||||
.map_err(|e| {
|
||||
error!(
|
||||
error = ?e,
|
||||
"Failed to send sync blobs to processor"
|
||||
);
|
||||
SendErrorProcessor::SendError
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn send_payload_for_processing(
|
||||
&self,
|
||||
@@ -1914,7 +1739,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
pub(crate) fn register_metrics(&self) {
|
||||
for (id, count) in [
|
||||
("blocks_by_root", self.blocks_by_root_requests.len()),
|
||||
("blobs_by_root", self.blobs_by_root_requests.len()),
|
||||
(
|
||||
"data_columns_by_root",
|
||||
self.data_columns_by_root_requests.len(),
|
||||
@@ -1935,17 +1759,3 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn to_fixed_blob_sidecar_list<E: EthSpec>(
|
||||
blobs: Vec<Arc<BlobSidecar<E>>>,
|
||||
max_len: usize,
|
||||
) -> Result<FixedBlobSidecarList<E>, LookupVerifyError> {
|
||||
let mut fixed_list = FixedBlobSidecarList::new(vec![None; max_len]);
|
||||
for blob in blobs.into_iter() {
|
||||
let index = blob.index as usize;
|
||||
*fixed_list
|
||||
.get_mut(index)
|
||||
.ok_or(LookupVerifyError::UnrequestedIndex(index as u64))? = Some(blob)
|
||||
}
|
||||
Ok(fixed_list)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ use tracing::{Span, debug};
|
||||
use types::{Hash256, Slot};
|
||||
|
||||
pub use blobs_by_range::BlobsByRangeRequestItems;
|
||||
pub use blobs_by_root::{BlobsByRootRequestItems, BlobsByRootSingleBlockRequest};
|
||||
pub use blocks_by_range::BlocksByRangeRequestItems;
|
||||
pub use blocks_by_root::{BlocksByRootRequestItems, BlocksByRootSingleRequest};
|
||||
pub use data_columns_by_range::DataColumnsByRangeRequestItems;
|
||||
@@ -25,7 +24,6 @@ use crate::metrics;
|
||||
use super::{RpcEvent, RpcResponseError, RpcResponseResult};
|
||||
|
||||
mod blobs_by_range;
|
||||
mod blobs_by_root;
|
||||
mod blocks_by_range;
|
||||
mod blocks_by_root;
|
||||
mod data_columns_by_range;
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
use lighthouse_network::rpc::methods::BlobsByRootRequest;
|
||||
use std::sync::Arc;
|
||||
use types::{BlobSidecar, EthSpec, ForkContext, Hash256, data::BlobIdentifier};
|
||||
|
||||
use super::{ActiveRequestItems, LookupVerifyError};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlobsByRootSingleBlockRequest {
|
||||
pub block_root: Hash256,
|
||||
pub indices: Vec<u64>,
|
||||
}
|
||||
|
||||
impl BlobsByRootSingleBlockRequest {
|
||||
pub fn into_request(self, spec: &ForkContext) -> Result<BlobsByRootRequest, String> {
|
||||
BlobsByRootRequest::new(
|
||||
self.indices
|
||||
.into_iter()
|
||||
.map(|index| BlobIdentifier {
|
||||
block_root: self.block_root,
|
||||
index,
|
||||
})
|
||||
.collect(),
|
||||
spec,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BlobsByRootRequestItems<E: EthSpec> {
|
||||
request: BlobsByRootSingleBlockRequest,
|
||||
items: Vec<Arc<BlobSidecar<E>>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> BlobsByRootRequestItems<E> {
|
||||
pub fn new(request: BlobsByRootSingleBlockRequest) -> Self {
|
||||
Self {
|
||||
request,
|
||||
items: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ActiveRequestItems for BlobsByRootRequestItems<E> {
|
||||
type Item = Arc<BlobSidecar<E>>;
|
||||
|
||||
/// Appends a chunk to this multi-item request. If all expected chunks are received, this
|
||||
/// method returns `Some`, resolving the request before the stream terminator.
|
||||
/// The active request SHOULD be dropped after `add_response` returns an error
|
||||
fn add(&mut self, blob: Self::Item) -> Result<bool, LookupVerifyError> {
|
||||
let block_root = blob.block_root();
|
||||
if self.request.block_root != block_root {
|
||||
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
|
||||
}
|
||||
|
||||
if !blob.verify_blob_sidecar_inclusion_proof() {
|
||||
return Err(LookupVerifyError::InvalidInclusionProof);
|
||||
}
|
||||
|
||||
if !self.request.indices.contains(&blob.index) {
|
||||
return Err(LookupVerifyError::UnrequestedIndex(blob.index));
|
||||
}
|
||||
if self.items.iter().any(|b| b.index == blob.index) {
|
||||
return Err(LookupVerifyError::DuplicatedData(blob.slot(), blob.index));
|
||||
}
|
||||
|
||||
self.items.push(blob);
|
||||
|
||||
Ok(self.items.len() >= self.request.indices.len())
|
||||
}
|
||||
|
||||
fn consume(&mut self) -> Vec<Self::Item> {
|
||||
std::mem::take(&mut self.items)
|
||||
}
|
||||
}
|
||||
@@ -8,13 +8,11 @@ use crate::sync::{
|
||||
SyncMessage,
|
||||
manager::{BatchProcessResult, BlockProcessType, BlockProcessingResult, SyncManager},
|
||||
};
|
||||
use beacon_chain::blob_verification::KzgVerifiedBlob;
|
||||
use beacon_chain::block_verification_types::LookupBlock;
|
||||
use beacon_chain::custody_context::NodeCustodyType;
|
||||
use beacon_chain::{
|
||||
AvailabilityProcessingStatus, BlockError, EngineState, NotifyExecutionLayer,
|
||||
block_verification_types::{AsBlock, AvailableBlockData},
|
||||
data_availability_checker::Availability,
|
||||
test_utils::{
|
||||
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, NumBlobs,
|
||||
generate_rand_block_and_blobs, test_spec,
|
||||
@@ -36,8 +34,8 @@ use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::info;
|
||||
use types::{
|
||||
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, EthSpec, ForkContext, ForkName,
|
||||
Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
|
||||
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, ForkContext, ForkName, Hash256,
|
||||
MinimalEthSpec as E, SignedBeaconBlock, Slot,
|
||||
};
|
||||
|
||||
const D: Duration = Duration::new(0, 0);
|
||||
@@ -549,52 +547,6 @@ impl TestRig {
|
||||
self.send_rpc_blocks_response(req_id, peer_id, &blocks);
|
||||
}
|
||||
|
||||
(RequestType::BlobsByRoot(req), AppRequestId::Sync(req_id)) => {
|
||||
if self.complete_strategy.return_no_data_n_times > 0 {
|
||||
self.complete_strategy.return_no_data_n_times -= 1;
|
||||
return self.send_rpc_blobs_response(req_id, peer_id, &[]);
|
||||
}
|
||||
|
||||
let mut blobs = req
|
||||
.blob_ids
|
||||
.iter()
|
||||
.map(|id| {
|
||||
self.network_blocks_by_root
|
||||
.get(&id.block_root)
|
||||
.unwrap_or_else(|| {
|
||||
panic!("Test consumer requested unknown block: {id:?}")
|
||||
})
|
||||
.block_data()
|
||||
.blobs()
|
||||
.unwrap_or_else(|| panic!("Block {id:?} has no blobs"))
|
||||
.iter()
|
||||
.find(|blob| blob.index == id.index)
|
||||
.unwrap_or_else(|| panic!("Blob id {id:?} not avail"))
|
||||
.clone()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if self.complete_strategy.return_too_few_data_n_times > 0 {
|
||||
self.complete_strategy.return_too_few_data_n_times -= 1;
|
||||
blobs.pop();
|
||||
}
|
||||
|
||||
if self
|
||||
.complete_strategy
|
||||
.return_wrong_sidecar_for_block_n_times
|
||||
> 0
|
||||
{
|
||||
self.complete_strategy
|
||||
.return_wrong_sidecar_for_block_n_times -= 1;
|
||||
let first = blobs.first_mut().expect("empty blobs");
|
||||
let mut blob = Arc::make_mut(first).clone();
|
||||
blob.signed_block_header.message.body_root = Hash256::ZERO;
|
||||
*first = Arc::new(blob);
|
||||
}
|
||||
|
||||
self.send_rpc_blobs_response(req_id, peer_id, &blobs);
|
||||
}
|
||||
|
||||
(RequestType::DataColumnsByRoot(req), AppRequestId::Sync(req_id)) => {
|
||||
if self.complete_strategy.return_no_data_n_times > 0 {
|
||||
self.complete_strategy.return_no_data_n_times -= 1;
|
||||
@@ -1006,48 +958,6 @@ impl TestRig {
|
||||
keypair.sk.sign(msg)
|
||||
}
|
||||
|
||||
fn corrupt_last_blob_proposer_signature(&mut self) {
|
||||
let range_sync_block = self.get_last_block().clone();
|
||||
let block = range_sync_block.block_cloned();
|
||||
let mut blobs = range_sync_block
|
||||
.block_data()
|
||||
.blobs()
|
||||
.expect("no blobs")
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
let columns = range_sync_block.block_data().data_columns();
|
||||
let first = blobs.first_mut().expect("empty blobs");
|
||||
Arc::make_mut(first).signed_block_header.signature = self.valid_signature();
|
||||
let max_blobs =
|
||||
self.harness
|
||||
.spec
|
||||
.max_blobs_per_block(block.slot().epoch(E::slots_per_epoch())) as usize;
|
||||
let blobs =
|
||||
types::BlobSidecarList::new(blobs, max_blobs).expect("invalid blob sidecar list");
|
||||
self.re_insert_block(block, Some(blobs), columns);
|
||||
}
|
||||
|
||||
fn corrupt_last_blob_kzg_proof(&mut self) {
|
||||
let range_sync_block = self.get_last_block().clone();
|
||||
let block = range_sync_block.block_cloned();
|
||||
let mut blobs = range_sync_block
|
||||
.block_data()
|
||||
.blobs()
|
||||
.expect("no blobs")
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
let columns = range_sync_block.block_data().data_columns();
|
||||
let first = blobs.first_mut().expect("empty blobs");
|
||||
Arc::make_mut(first).kzg_proof = kzg::KzgProof::empty();
|
||||
let max_blobs =
|
||||
self.harness
|
||||
.spec
|
||||
.max_blobs_per_block(block.slot().epoch(E::slots_per_epoch())) as usize;
|
||||
let blobs =
|
||||
types::BlobSidecarList::new(blobs, max_blobs).expect("invalid blob sidecar list");
|
||||
self.re_insert_block(block, Some(blobs), columns);
|
||||
}
|
||||
|
||||
fn corrupt_last_column_proposer_signature(&mut self) {
|
||||
let range_sync_block = self.get_last_block().clone();
|
||||
let block = range_sync_block.block_cloned();
|
||||
@@ -1413,10 +1323,6 @@ impl TestRig {
|
||||
|
||||
// Test setup
|
||||
|
||||
fn new_after_deneb() -> Option<Self> {
|
||||
genesis_fork().deneb_enabled().then(Self::default)
|
||||
}
|
||||
|
||||
fn new_after_fulu() -> Option<Self> {
|
||||
genesis_fork().fulu_enabled().then(Self::default)
|
||||
}
|
||||
@@ -1443,10 +1349,6 @@ impl TestRig {
|
||||
info!(msg, "TEST_RIG");
|
||||
}
|
||||
|
||||
pub fn is_after_deneb(&self) -> bool {
|
||||
self.fork_name.deneb_enabled()
|
||||
}
|
||||
|
||||
pub fn is_after_fulu(&self) -> bool {
|
||||
self.fork_name.fulu_enabled()
|
||||
}
|
||||
@@ -1732,27 +1634,6 @@ impl TestRig {
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_blob_to_da_checker(&mut self, blob: Arc<BlobSidecar<E>>) {
|
||||
match self
|
||||
.harness
|
||||
.chain
|
||||
.data_availability_checker
|
||||
.put_kzg_verified_blobs(
|
||||
blob.block_root(),
|
||||
std::iter::once(
|
||||
KzgVerifiedBlob::new(blob, &self.harness.chain.kzg, Duration::new(0, 0))
|
||||
.expect("Invalid blob"),
|
||||
),
|
||||
)
|
||||
.unwrap()
|
||||
{
|
||||
Availability::Available(_) => panic!("column removed from da_checker, available"),
|
||||
Availability::MissingComponents(block_root) => {
|
||||
self.log(&format!("inserted column to da_checker {block_root:?}"))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn insert_block_to_da_checker_as_pre_execution(&mut self, block: Arc<SignedBeaconBlock<E>>) {
|
||||
self.log(&format!(
|
||||
"Inserting block to availability_cache as pre_execution_block {:?}",
|
||||
@@ -1919,18 +1800,14 @@ async fn happy_path_unknown_block_parent(depth: usize) {
|
||||
r.build_chain(depth).await;
|
||||
r.trigger_with_last_unknown_block_parent();
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
// All lookups should NOT complete on this test, however note the following for the tip lookup,
|
||||
// it's the lookup for the tip block which has 0 peers and a block cached:
|
||||
// Note the following for the tip lookup, it's the lookup for the tip block which has 0 peers
|
||||
// and a block cached:
|
||||
// - before deneb the block is cached, so it's sent for processing, and success
|
||||
// - before fulu the block is cached, but we can't fetch blobs so it's stuck
|
||||
// - deneb/electra the block is cached, so it's sent for processing, and success
|
||||
// - after fulu the block is cached, we start a custody request and since we use the global pool
|
||||
// of peers we DO have 1 connected synced supernode peer, which gives us the columns and the
|
||||
// lookup succeeds
|
||||
if r.is_after_deneb() && !r.is_after_fulu() {
|
||||
r.assert_successful_lookup_sync_parent_trigger()
|
||||
} else {
|
||||
r.assert_successful_lookup_sync();
|
||||
}
|
||||
r.assert_successful_lookup_sync();
|
||||
}
|
||||
|
||||
/// Assert that sync completes from an UnknownDataColumnParent
|
||||
@@ -1978,9 +1855,9 @@ async fn bad_peer_empty_block_response(depth: usize) {
|
||||
// TODO(tree-sync) Assert that a single lookup is created (no drops)
|
||||
}
|
||||
|
||||
/// Assert that if peer responds with no blobs / columns, we downscore, and retry the same lookup
|
||||
/// Assert that if peer responds with no columns, we downscore, and retry the same lookup.
|
||||
async fn bad_peer_empty_data_response(depth: usize) {
|
||||
let Some(mut r) = TestRig::new_after_deneb() else {
|
||||
let Some(mut r) = TestRig::new_after_fulu() else {
|
||||
return;
|
||||
};
|
||||
r.build_chain_and_trigger_last_block(depth).await;
|
||||
@@ -1992,10 +1869,10 @@ async fn bad_peer_empty_data_response(depth: usize) {
|
||||
// TODO(tree-sync) Assert that a single lookup is created (no drops)
|
||||
}
|
||||
|
||||
/// Assert that if peer responds with not enough blobs / columns, we downscore, and retry the same
|
||||
/// lookup
|
||||
/// Assert that if peer responds with not enough columns, we downscore, and retry the same
|
||||
/// lookup.
|
||||
async fn bad_peer_too_few_data_response(depth: usize) {
|
||||
let Some(mut r) = TestRig::new_after_deneb() else {
|
||||
let Some(mut r) = TestRig::new_after_fulu() else {
|
||||
return;
|
||||
};
|
||||
r.build_chain_and_trigger_last_block(depth).await;
|
||||
@@ -2019,9 +1896,9 @@ async fn bad_peer_wrong_block_response(depth: usize) {
|
||||
// TODO(tree-sync) Assert that a single lookup is created (no drops)
|
||||
}
|
||||
|
||||
/// Assert that if peer responds with bad blobs / columns, we downscore, and retry the same lookup
|
||||
/// Assert that if peer responds with bad columns, we downscore, and retry the same lookup.
|
||||
async fn bad_peer_wrong_data_response(depth: usize) {
|
||||
let Some(mut r) = TestRig::new_after_deneb() else {
|
||||
let Some(mut r) = TestRig::new_after_fulu() else {
|
||||
return;
|
||||
};
|
||||
r.build_chain_and_trigger_last_block(depth).await;
|
||||
@@ -2342,8 +2219,8 @@ async fn test_same_chain_race_condition() {
|
||||
#[tokio::test]
|
||||
/// Assert that if the lookup's block is in the da_checker we don't download it again
|
||||
async fn block_in_da_checker_skips_download() {
|
||||
// Only in Deneb, as the block needs blobs to remain in the da_checker
|
||||
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
|
||||
// Only post-Fulu, as the block needs custody columns to remain in the da_checker
|
||||
let Some(mut r) = TestRig::new_after_fulu() else {
|
||||
return;
|
||||
};
|
||||
// Add block to da_checker
|
||||
@@ -2407,32 +2284,6 @@ async fn block_in_processing_cache_becomes_valid_imported() {
|
||||
r.assert_no_active_lookups();
|
||||
}
|
||||
|
||||
// IGNORE: wait for change that delays blob fetching to knowing the block
|
||||
#[tokio::test]
|
||||
async fn blobs_in_da_checker_skip_download() {
|
||||
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
|
||||
return;
|
||||
};
|
||||
r.build_chain(1).await;
|
||||
let block = r.get_last_block().clone();
|
||||
let blobs = block.block_data().blobs().expect("block with no blobs");
|
||||
for blob in &blobs {
|
||||
r.insert_blob_to_da_checker(blob.clone());
|
||||
}
|
||||
r.trigger_with_last_block();
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
|
||||
r.assert_successful_lookup_sync();
|
||||
assert_eq!(
|
||||
r.requests
|
||||
.iter()
|
||||
.filter(|(request, _)| matches!(request, RequestType::BlobsByRoot(_)))
|
||||
.collect::<Vec<_>>(),
|
||||
Vec::<&(RequestType<E>, AppRequestId)>::new(),
|
||||
"There should be no blob requests"
|
||||
);
|
||||
}
|
||||
|
||||
macro_rules! fulu_peer_matrix_tests {
|
||||
(
|
||||
[$($name:ident => $variant:expr),+ $(,)?]
|
||||
@@ -2545,42 +2396,6 @@ async fn crypto_on_fail_with_invalid_block_signature() {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn crypto_on_fail_with_bad_blob_proposer_signature() {
|
||||
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
|
||||
return;
|
||||
};
|
||||
r.build_chain(1).await;
|
||||
r.corrupt_last_blob_proposer_signature();
|
||||
r.trigger_with_last_block();
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
if cfg!(feature = "fake_crypto") {
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_no_penalties();
|
||||
} else {
|
||||
r.assert_failed_lookup_sync();
|
||||
r.assert_penalties_of_type("lookup_blobs_processing_failure");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn crypto_on_fail_with_bad_blob_kzg_proof() {
|
||||
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
|
||||
return;
|
||||
};
|
||||
r.build_chain(1).await;
|
||||
r.corrupt_last_blob_kzg_proof();
|
||||
r.trigger_with_last_block();
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
if cfg!(feature = "fake_crypto") {
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_no_penalties();
|
||||
} else {
|
||||
r.assert_failed_lookup_sync();
|
||||
r.assert_penalties_of_type("lookup_blobs_processing_failure");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn crypto_on_fail_with_bad_column_proposer_signature() {
|
||||
let Some(mut r) = TestRig::new_fulu_peer_test(FuluTestType::WeSupernodeThemSupernode) else {
|
||||
|
||||
Reference in New Issue
Block a user