Make CGC queries dependant on the block slot

This commit is contained in:
dapplion
2025-03-28 13:10:17 -03:00
parent ca8eaea116
commit 0978156987
15 changed files with 165 additions and 110 deletions

View File

@@ -16,6 +16,7 @@ use strum::AsRefStr;
use strum::IntoEnumIterator;
use types::DataColumnSubnetId;
use types::EthSpec;
use types::Slot;
pub const SUCCESS: &str = "SUCCESS";
pub const FAILURE: &str = "FAILURE";
@@ -742,7 +743,7 @@ pub fn update_gossip_metrics<E: EthSpec>(
}
}
pub fn update_sync_metrics<E: EthSpec>(network_globals: &Arc<NetworkGlobals<E>>) {
pub fn update_sync_metrics<E: EthSpec>(network_globals: &Arc<NetworkGlobals<E>>, clock_slot: Slot) {
// reset the counts
if PEERS_PER_SYNC_TYPE
.as_ref()
@@ -771,7 +772,7 @@ pub fn update_sync_metrics<E: EthSpec>(network_globals: &Arc<NetworkGlobals<E>>)
let all_column_subnets =
(0..network_globals.spec.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new);
let custody_column_subnets = network_globals.sampling_subnets.iter();
let custody_column_subnets = network_globals.sampling_subnets(clock_slot).iter();
// Iterate all subnet values to set to zero the empty entries in peers_per_column_subnet
for subnet in all_column_subnets {

View File

@@ -1128,6 +1128,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) {
let processing_start_time = Instant::now();
let block_root = verified_data_column.block_root();
let block_slot = verified_data_column.slot();
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.id().index;
@@ -1159,7 +1160,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Processed data column, waiting for other components"
);
self.attempt_data_column_reconstruction(block_root).await;
self.attempt_data_column_reconstruction(block_root, block_slot)
.await;
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
@@ -1259,7 +1261,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.clone()
.verify_block_for_gossip(
block.clone(),
self.network_globals.custody_columns_count() as usize,
self.network_globals.custody_columns_count(block.slot()) as usize,
)
.await;

View File

@@ -927,7 +927,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
publish_blobs: bool,
) {
let is_supernode = self.network_globals.is_supernode();
let is_supernode = self.network_globals.is_supernode(block.slot());
let self_cloned = self.clone();
let publish_fn = move |blobs_or_data_column| {
@@ -1008,9 +1008,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
async fn attempt_data_column_reconstruction(
self: &Arc<Self>,
block_root: Hash256,
block_slot: Slot,
) -> Option<AvailabilityProcessingStatus> {
// Only supernodes attempt reconstruction
if !self.network_globals.is_supernode() {
if !self.network_globals.is_supernode(block_slot) {
return None;
}

View File

@@ -383,8 +383,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
// Attempt reconstruction here before notifying sync, to avoid sending out more requests
// that we may no longer need.
if let Some(availability) =
self.attempt_data_column_reconstruction(block_root).await
if let Some(availability) = self
.attempt_data_column_reconstruction(block_root, slot)
.await
{
result = Ok(availability)
}

View File

@@ -420,9 +420,9 @@ impl<T: BeaconChainTypes> NetworkService<T> {
metrics::update_gossip_metrics::<T::EthSpec>(
self.libp2p.gossipsub(),
&self.network_globals,
);
);
// update sync metrics
metrics::update_sync_metrics(&self.network_globals);
metrics::update_sync_metrics(&self.network_globals, self.clock_slot());
}
_ = self.gossipsub_parameter_update.tick() => self.update_gossipsub_parameters(),
@@ -690,7 +690,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.network_globals.as_topic_config(),
&self.network_globals.as_topic_config(self.clock_slot()),
&self.fork_context.spec,
) {
for fork_digest in self.required_gossip_fork_digests() {
@@ -843,7 +843,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
fn subscribed_core_topics(&self) -> bool {
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.network_globals.as_topic_config(),
&self.network_globals.as_topic_config(self.clock_slot()),
&self.fork_context.spec,
);
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
@@ -853,6 +853,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
core_topics.is_subset(&subscribed_topics)
}
fn clock_slot(&self) -> Slot {
self.beacon_chain.slot().unwrap_or(Slot::new(0))
}
}
/// Returns a `Sleep` that triggers after the next change in the beacon chain fork version.

View File

@@ -13,7 +13,7 @@ use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{DataColumnSidecarList, SignedBeaconBlock};
use types::{DataColumnSidecarList, SignedBeaconBlock, Slot};
use super::single_block_lookup::{ComponentRequests, DownloadResult};
use super::SingleLookupId;
@@ -45,6 +45,7 @@ pub trait RequestState<T: BeaconChainTypes> {
id: Id,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
expected_blobs: usize,
block_slot: Slot,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError>;
@@ -80,6 +81,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
id: SingleLookupId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
_: usize,
_: Slot,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, lookup_peers, self.requested_block_root)
@@ -128,6 +130,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
id: Id,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
expected_blobs: usize,
_: Slot,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs)
@@ -155,7 +158,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
match &mut request.component_requests {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
ComponentRequests::ActiveBlobRequest(request, ..) => Ok(request),
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}
@@ -176,9 +179,10 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
id: Id,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
_: usize,
block_slot: Slot,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.custody_lookup_request(id, self.block_root, lookup_peers)
cx.custody_lookup_request(id, self.block_root, block_slot, lookup_peers)
.map_err(LookupRequestError::SendFailedNetwork)
}
@@ -210,7 +214,7 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
match &mut request.component_requests {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
ComponentRequests::ActiveCustodyRequest(request, ..) => Ok(request),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}
}

View File

@@ -75,8 +75,8 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
#[derive(Debug)]
pub(crate) enum ComponentRequests<E: EthSpec> {
WaitingForBlock,
ActiveBlobRequest(BlobRequestState<E>, usize),
ActiveCustodyRequest(CustodyRequestState<E>),
ActiveBlobRequest(BlobRequestState<E>, usize, Slot),
ActiveCustodyRequest(CustodyRequestState<E>, usize, Slot),
// When printing in debug this state display the reason why it's not needed
#[allow(dead_code)]
NotNeeded(&'static str),
@@ -161,8 +161,10 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.block_request_state.state.is_processed()
&& match &self.component_requests {
ComponentRequests::WaitingForBlock => false,
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
ComponentRequests::ActiveBlobRequest(request, ..) => request.state.is_processed(),
ComponentRequests::ActiveCustodyRequest(request, ..) => {
request.state.is_processed()
}
ComponentRequests::NotNeeded { .. } => true,
}
}
@@ -176,10 +178,10 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// check if the`block_request_state.state.is_awaiting_event(). However we already
// checked that above, so `WaitingForBlock => false` is equivalent.
ComponentRequests::WaitingForBlock => false,
ComponentRequests::ActiveBlobRequest(request, _) => {
ComponentRequests::ActiveBlobRequest(request, ..) => {
request.state.is_awaiting_event()
}
ComponentRequests::ActiveCustodyRequest(request) => {
ComponentRequests::ActiveCustodyRequest(request, ..) => {
request.state.is_awaiting_event()
}
ComponentRequests::NotNeeded { .. } => false,
@@ -193,7 +195,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
// TODO: Check what's necessary to download, specially for blobs
self.continue_request::<BlockRequestState<T::EthSpec>>(cx, 0)?;
self.continue_request::<BlockRequestState<T::EthSpec>>(cx, 0, Slot::new(0))?;
if let ComponentRequests::WaitingForBlock = self.component_requests {
let downloaded_block = self
@@ -213,6 +215,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}) {
let expected_blobs = block.num_expected_blobs();
let block_slot = block.slot();
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
if expected_blobs == 0 {
self.component_requests = ComponentRequests::NotNeeded("no data");
@@ -220,10 +223,13 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.component_requests = ComponentRequests::ActiveBlobRequest(
BlobRequestState::new(self.block_root),
expected_blobs,
block_slot,
);
} else if cx.chain.should_fetch_custody_columns(block_epoch) {
self.component_requests = ComponentRequests::ActiveCustodyRequest(
CustodyRequestState::new(self.block_root),
expected_blobs,
block_slot,
);
} else {
self.component_requests = ComponentRequests::NotNeeded("outside da window");
@@ -244,11 +250,19 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
match &self.component_requests {
ComponentRequests::WaitingForBlock => {} // do nothing
ComponentRequests::ActiveBlobRequest(_, expected_blobs) => {
self.continue_request::<BlobRequestState<T::EthSpec>>(cx, *expected_blobs)?
ComponentRequests::ActiveBlobRequest(_, expected_blobs, block_slot) => {
self.continue_request::<BlobRequestState<T::EthSpec>>(
cx,
*expected_blobs,
*block_slot,
)?
}
ComponentRequests::ActiveCustodyRequest(_) => {
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
ComponentRequests::ActiveCustodyRequest(_, expected_blobs, block_slot) => {
self.continue_request::<CustodyRequestState<T::EthSpec>>(
cx,
*expected_blobs,
*block_slot,
)?
}
ComponentRequests::NotNeeded { .. } => {} // do nothing
}
@@ -268,6 +282,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
&mut self,
cx: &mut SyncNetworkContext<T>,
expected_blobs: usize,
block_slot: Slot,
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_parent = self.awaiting_parent.is_some();
@@ -287,7 +302,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
let request = R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
match request.make_request(id, peers, expected_blobs, cx)? {
match request.make_request(id, peers, expected_blobs, block_slot, cx)? {
LookupRequestResult::RequestSent(req_id) => {
// Lookup sync event safety: If make_request returns `RequestSent`, we are
// guaranteed that `BlockLookups::on_download_response` will be called exactly

View File

@@ -394,12 +394,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
id: self.next_id(),
requester,
};
let request_start_slot = Slot::new(*request.start_slot());
// Compute custody column peers before sending the blocks_by_range request. If we don't have
// enough peers, error here.
let data_column_requests = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
let column_indexes = self.network_globals().sampling_columns.clone();
Some(self.make_columns_by_range_requests(request.clone(), &column_indexes)?)
let column_indexes = self.network_globals().sampling_columns(request_start_slot);
Some(self.make_columns_by_range_requests(request.clone(), column_indexes)?)
} else {
None
};
@@ -430,10 +431,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Some((
data_column_requests,
self.network_globals()
.sampling_columns
.iter()
.cloned()
.collect::<Vec<_>>(),
.sampling_columns(request_start_slot)
.to_vec(),
))
} else {
None
@@ -448,7 +447,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
fn make_columns_by_range_requests(
&self,
request: BlocksByRangeRequest,
custody_indexes: &HashSet<ColumnIndex>,
custody_indexes: &[ColumnIndex],
) -> Result<HashMap<PeerId, DataColumnsByRangeRequest>, RpcRequestSendError> {
let mut peer_id_to_request_map = HashMap::new();
@@ -763,6 +762,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
lookup_id: SingleLookupId,
block_root: Hash256,
block_slot: Slot,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let span = span!(
@@ -781,9 +781,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Include only the blob indexes not yet imported (received through gossip)
let custody_indexes_to_fetch = self
.network_globals()
.sampling_columns
.clone()
.into_iter()
.sampling_columns(block_slot)
.iter()
.copied()
.filter(|index| !custody_indexes_imported.contains(index))
.collect::<Vec<_>>();

View File

@@ -1130,7 +1130,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Require peers on all sampling column subnets before sending batches
let peers_on_all_custody_subnets = network
.network_globals()
.sampling_subnets
.sampling_subnets(epoch.start_slot(T::EthSpec::slots_per_epoch()))
.iter()
.all(|subnet_id| {
let peer_count = network

View File

@@ -1203,11 +1203,12 @@ impl TestRig {
payload_verification_status: PayloadVerificationStatus::Verified,
is_valid_merge_transition_block: false,
};
let block_slot = block.slot();
let executed_block = AvailabilityPendingExecutedBlock::new(
block,
import_data,
payload_verification_outcome,
self.network_globals.custody_columns_count() as usize,
self.network_globals.custody_columns_count(block_slot) as usize,
);
match self
.harness