Fork aware max values in rpc (#6847)

N/A


  In https://github.com/sigp/lighthouse/pull/6329 we changed `max_blobs_per_block` from a preset to a config value.
We weren't using the right value based on fork in that PR. This is a follow up PR to use the fork dependent values.

In the proces, I also updated other places where we weren't using fork dependent values from the ChainSpec.

Note to reviewer: easier to go through by commit
This commit is contained in:
Pawan Dhananjay
2025-01-29 11:42:13 -08:00
committed by GitHub
parent e7ea69647a
commit 4a07c08c4f
16 changed files with 203 additions and 114 deletions

View File

@@ -659,24 +659,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"start_slot" => req.start_slot(),
);
// Should not send more than max request blocks
let max_request_size =
self.chain
.epoch()
.map_or(self.chain.spec.max_request_blocks, |epoch| {
if self.chain.spec.fork_name_at_epoch(epoch).deneb_enabled() {
self.chain.spec.max_request_blocks_deneb
} else {
self.chain.spec.max_request_blocks
}
});
if *req.count() > max_request_size {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded max size",
));
}
let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(*req.start_slot()))

View File

@@ -28,7 +28,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock};
/// Handles messages from the network and routes them to the appropriate service to be handled.
pub struct Router<T: BeaconChainTypes> {
@@ -90,6 +90,7 @@ impl<T: BeaconChainTypes> Router<T> {
invalid_block_storage: InvalidBlockStorage,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
fork_context: Arc<ForkContext>,
log: slog::Logger,
) -> Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>, String> {
let message_handler_log = log.new(o!("service"=> "router"));
@@ -122,6 +123,7 @@ impl<T: BeaconChainTypes> Router<T> {
network_send.clone(),
network_beacon_processor.clone(),
sync_recv,
fork_context,
sync_logger,
);

View File

@@ -312,6 +312,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
invalid_block_storage,
beacon_processor_send,
beacon_processor_reprocess_tx,
fork_context.clone(),
network_log.clone(),
)?;

View File

@@ -69,7 +69,9 @@ use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
};
#[cfg(test)]
use types::ColumnIndex;
@@ -258,10 +260,11 @@ pub fn spawn<T: BeaconChainTypes>(
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
beacon_processor: Arc<NetworkBeaconProcessor<T>>,
sync_recv: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,
fork_context: Arc<ForkContext>,
log: slog::Logger,
) {
assert!(
beacon_chain.spec.max_request_blocks >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH,
beacon_chain.spec.max_request_blocks(fork_context.current_fork()) as u64 >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH,
"Max blocks that can be requested in a single batch greater than max allowed blocks in a single request"
);
@@ -272,6 +275,7 @@ pub fn spawn<T: BeaconChainTypes>(
beacon_processor,
sync_recv,
SamplingConfig::Default,
fork_context,
log.clone(),
);
@@ -287,6 +291,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
beacon_processor: Arc<NetworkBeaconProcessor<T>>,
sync_recv: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,
sampling_config: SamplingConfig,
fork_context: Arc<ForkContext>,
log: slog::Logger,
) -> Self {
let network_globals = beacon_processor.network_globals.clone();
@@ -297,6 +302,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
network_send,
beacon_processor.clone(),
beacon_chain.clone(),
fork_context.clone(),
log.clone(),
),
range_sync: RangeSync::new(

View File

@@ -43,8 +43,8 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256,
SignedBeaconBlock, Slot,
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext,
Hash256, SignedBeaconBlock, Slot,
};
pub mod custody;
@@ -216,6 +216,8 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
fork_context: Arc<ForkContext>,
/// Logger for the `SyncNetworkContext`.
pub log: slog::Logger,
}
@@ -244,6 +246,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
network_beacon_processor: Arc<NetworkBeaconProcessor<T>>,
chain: Arc<BeaconChain<T>>,
fork_context: Arc<ForkContext>,
log: slog::Logger,
) -> Self {
SyncNetworkContext {
@@ -257,6 +260,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
range_block_components_requests: FnvHashMap::default(),
network_beacon_processor,
chain,
fork_context,
log,
}
}
@@ -455,7 +459,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
(None, None)
};
// TODO(pawan): this would break if a batch contains multiple epochs
let max_blobs_len = self.chain.spec.max_blobs_per_block(epoch);
let info = RangeBlockComponentsRequest::new(
expected_blobs,
@@ -624,7 +627,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
request: RequestType::BlocksByRoot(request.into_request(&self.chain.spec)),
request: RequestType::BlocksByRoot(request.into_request(&self.fork_context)),
request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
@@ -706,7 +709,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
request: RequestType::BlobsByRoot(request.clone().into_request(&self.chain.spec)),
request: RequestType::BlobsByRoot(request.clone().into_request(&self.fork_context)),
request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;

View File

@@ -1,6 +1,6 @@
use lighthouse_network::rpc::methods::BlobsByRootRequest;
use std::sync::Arc;
use types::{blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256};
use types::{blob_sidecar::BlobIdentifier, BlobSidecar, EthSpec, ForkContext, Hash256};
use super::{ActiveRequestItems, LookupVerifyError};
@@ -11,7 +11,7 @@ pub struct BlobsByRootSingleBlockRequest {
}
impl BlobsByRootSingleBlockRequest {
pub fn into_request(self, spec: &ChainSpec) -> BlobsByRootRequest {
pub fn into_request(self, spec: &ForkContext) -> BlobsByRootRequest {
BlobsByRootRequest::new(
self.indices
.into_iter()

View File

@@ -1,7 +1,7 @@
use beacon_chain::get_block_root;
use lighthouse_network::rpc::BlocksByRootRequest;
use std::sync::Arc;
use types::{ChainSpec, EthSpec, Hash256, SignedBeaconBlock};
use types::{EthSpec, ForkContext, Hash256, SignedBeaconBlock};
use super::{ActiveRequestItems, LookupVerifyError};
@@ -9,8 +9,8 @@ use super::{ActiveRequestItems, LookupVerifyError};
pub struct BlocksByRootSingleRequest(pub Hash256);
impl BlocksByRootSingleRequest {
pub fn into_request(self, spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![self.0], spec)
pub fn into_request(self, fork_context: &ForkContext) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![self.0], fork_context)
}
}

View File

@@ -39,6 +39,7 @@ use lighthouse_network::{
use slog::info;
use slot_clock::{SlotClock, TestingSlotClock};
use tokio::sync::mpsc;
use types::ForkContext;
use types::{
data_column_sidecar::ColumnIndex,
test_utils::{SeedableRng, TestRandom, XorShiftRng},
@@ -92,6 +93,11 @@ impl TestRig {
.build();
let chain = harness.chain.clone();
let fork_context = Arc::new(ForkContext::new::<E>(
Slot::new(0),
chain.genesis_validators_root,
&chain.spec,
));
let (network_tx, network_rx) = mpsc::unbounded_channel();
let (sync_tx, sync_rx) = mpsc::unbounded_channel::<SyncMessage<E>>();
@@ -139,6 +145,7 @@ impl TestRig {
SamplingConfig::Custom {
required_successes: vec![SAMPLING_REQUIRED_SUCCESSES],
},
fork_context,
log.clone(),
),
harness,