From 7416d06dce8ed95f079bfd945887601f98e1fd3a Mon Sep 17 00:00:00 2001 From: chonghe <44791194+chong-he@users.noreply.github.com> Date: Wed, 11 Jun 2025 17:51:37 +0800 Subject: [PATCH 1/3] Add genesis sync test to CI (#7561) * #7550 Use existing code from @jimmygchen in #7530 and modify for genesis sync test. Thanks @jimmygchen ! --- .github/workflows/local-testnet.yml | 47 +++++- scripts/local_testnet/start_local_testnet.sh | 2 +- .../tests/genesis-sync-config-electra.yaml | 22 +++ scripts/tests/genesis-sync-config-fulu.yaml | 22 +++ scripts/tests/genesis-sync.sh | 151 ++++++++++++++++++ 5 files changed, 241 insertions(+), 3 deletions(-) create mode 100644 scripts/tests/genesis-sync-config-electra.yaml create mode 100644 scripts/tests/genesis-sync-config-fulu.yaml create mode 100755 scripts/tests/genesis-sync.sh diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index 7bd8b40d76..5cffb4e2fd 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -20,7 +20,7 @@ jobs: - name: Build Docker image run: | - docker build --build-arg FEATURES=portable -t lighthouse:local . + docker build --build-arg FEATURES=portable,spec-minimal -t lighthouse:local . docker save lighthouse:local -o lighthouse-docker.tar - name: Upload Docker image artifact @@ -213,6 +213,49 @@ jobs: scripts/local_testnet/logs retention-days: 3 + # Test syncing from genesis on a local testnet. Aims to cover forward syncing both short and long distances. + genesis-sync-test: + name: genesis-sync-test-${{ matrix.fork }}-${{ matrix.offline_secs }}s + runs-on: ubuntu-latest + needs: dockerfile-ubuntu + if: contains(github.event.pull_request.labels.*.name, 'syncing') + strategy: + matrix: + fork: [electra, fulu] + offline_secs: [120, 300] + steps: + - uses: actions/checkout@v4 + + - name: Install Kurtosis + run: | + echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list + sudo apt update + sudo apt install -y kurtosis-cli + kurtosis analytics disable + + - name: Download Docker image artifact + uses: actions/download-artifact@v4 + with: + name: lighthouse-docker + path: . + + - name: Load Docker image + run: docker load -i lighthouse-docker.tar + + - name: Run the genesis sync test script + run: | + ./genesis-sync.sh "sync-${{ matrix.fork }}-${{ matrix.offline_secs }}s" "genesis-sync-config-${{ matrix.fork }}.yaml" "${{ matrix.fork }}" "${{ matrix.offline_secs }}" + working-directory: scripts/tests + + - name: Upload logs artifact + if: always() + uses: actions/upload-artifact@v4 + with: + name: logs-genesis-sync-${{ matrix.fork }}-${{ matrix.offline_secs }}s + path: | + scripts/local_testnet/logs + retention-days: 3 + # This job succeeds ONLY IF all others succeed. It is used by the merge queue to determine whether # a PR is safe to merge. New jobs should be added here. local-testnet-success: @@ -228,5 +271,5 @@ jobs: - uses: actions/checkout@v4 - name: Check that success job is dependent on all others run: | - exclude_jobs='checkpoint-sync-test' + exclude_jobs='checkpoint-sync-test|genesis-sync-test' ./scripts/ci/check-success-job.sh ./.github/workflows/local-testnet.yml local-testnet-success "$exclude_jobs" diff --git a/scripts/local_testnet/start_local_testnet.sh b/scripts/local_testnet/start_local_testnet.sh index 8e8859ca0e..442e6fd98d 100755 --- a/scripts/local_testnet/start_local_testnet.sh +++ b/scripts/local_testnet/start_local_testnet.sh @@ -81,7 +81,7 @@ fi if [ "$BUILD_IMAGE" = true ]; then echo "Building Lighthouse Docker image." ROOT_DIR="$SCRIPT_DIR/../.." - docker build --build-arg FEATURES=portable -f $ROOT_DIR/Dockerfile -t $LH_IMAGE_NAME $ROOT_DIR + docker build --build-arg FEATURES=portable,spec-minimal -f $ROOT_DIR/Dockerfile -t $LH_IMAGE_NAME $ROOT_DIR else echo "Not rebuilding Lighthouse Docker image." fi diff --git a/scripts/tests/genesis-sync-config-electra.yaml b/scripts/tests/genesis-sync-config-electra.yaml new file mode 100644 index 0000000000..153f754c94 --- /dev/null +++ b/scripts/tests/genesis-sync-config-electra.yaml @@ -0,0 +1,22 @@ +# Kurtosis config file for testing sync on a local devnet. +participants: + - cl_type: lighthouse + cl_image: lighthouse:local + count: 2 + # nodes without validators, used for testing sync. + - cl_type: lighthouse + cl_image: lighthouse:local + supernode: true # no supernode in Electra, this is for future proof + validator_count: 0 + - cl_type: lighthouse + cl_image: lighthouse:local + supernode: false + validator_count: 0 +network_params: + seconds_per_slot: 6 + electra_fork_epoch: 0 + preset: "minimal" +additional_services: + - tx_fuzz + - spamoor +global_log_level: debug diff --git a/scripts/tests/genesis-sync-config-fulu.yaml b/scripts/tests/genesis-sync-config-fulu.yaml new file mode 100644 index 0000000000..ccdc09c0d3 --- /dev/null +++ b/scripts/tests/genesis-sync-config-fulu.yaml @@ -0,0 +1,22 @@ +# Kurtosis config file for testing sync on a local devnet. +participants: + - cl_type: lighthouse + cl_image: lighthouse:local + count: 2 + # nodes without validators, used for testing sync. + - cl_type: lighthouse + cl_image: lighthouse:local + supernode: true + validator_count: 0 + - cl_type: lighthouse + cl_image: lighthouse:local + supernode: false + validator_count: 0 +network_params: + seconds_per_slot: 6 + fulu_fork_epoch: 0 + preset: "minimal" +additional_services: + - tx_fuzz + - spamoor +global_log_level: debug diff --git a/scripts/tests/genesis-sync.sh b/scripts/tests/genesis-sync.sh new file mode 100755 index 0000000000..39628c9e73 --- /dev/null +++ b/scripts/tests/genesis-sync.sh @@ -0,0 +1,151 @@ +#!/usr/bin/env bash +# +# Genesis sync test on a local network. +# +# Start a local testnet, shut down non-validator nodes for a period, then restart them +# and monitor their sync progress from genesis to head. +SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" + +ENCLAVE_NAME=${1:-genesis-sync-testnet} +CONFIG=${2:-$SCRIPT_DIR/genesis-sync-config-electra.yaml} +FORK_TYPE=${3:-electra} # electra or fulu +OFFLINE_DURATION_SECS=${4:-120} # stopped duration of non validating nodes + +# Test configuration +# ------------------------------------------------------ +# Interval for polling the /lighthouse/syncing endpoint for sync status +# Reduce the polling time so that some progress can be seen +POLL_INTERVAL_SECS=0.5 +# Timeout for this test, if the nodes fail to sync, fail the test. +TIMEOUT_MINS=5 +TIMEOUT_SECS=$((TIMEOUT_MINS * 60)) +# ------------------------------------------------------ + +echo "Starting genesis sync test with:" +echo " Fork: $FORK_TYPE" +echo " Offline duration: ${OFFLINE_DURATION_SECS}s" + +# Polls a node's sync status +poll_node() { + local node_type=$1 + local url=${node_urls[$node_type]} + + response=$(curl -s "${url}/lighthouse/syncing" 2>/dev/null) + + if [ -z "$response" ] || [ "$response" = "null" ]; then + echo "${node_type} status: No response or null response" + return + fi + + # Print syncing status + sync_state=$(echo "$response" | jq -r 'if (.data | type) == "object" then "object" else "string" end' 2>/dev/null) + + if [ "$sync_state" = "object" ]; then + status=$(echo "$response" | jq -r '.data | keys[0] // "Unknown"') + fields=$(echo "$response" | jq -r ".data.${status} | to_entries | map(\"\(.key): \(.value)\") | join(\", \")") + echo "${node_type} status: ${status}, ${fields}" + else + status=$(echo "$response" | jq -r '.data' 2>/dev/null) + echo "${node_type} status: ${status:-Unknown}" + + # The test is complete when the node is synced + if [ "$status" = "Synced" ]; then + mark_node_complete "$node_type" + fi + fi +} + +# Marks a node as complete and record time +mark_node_complete() { + local node_type=$1 + if [ "${node_completed[$node_type]}" = false ]; then + node_completed[$node_type]=true + node_complete_time[$node_type]=$(date +%s) + echo "${node_type} completed sync in $((node_complete_time[$node_type] - sync_start_time)) seconds" + fi +} + +exit_and_dump_logs() { + local exit_code=$1 + echo "Shutting down..." + $SCRIPT_DIR/../local_testnet/stop_local_testnet.sh $ENCLAVE_NAME + echo "Test completed with exit code $exit_code." + exit $exit_code +} + +# Start the nodes +$SCRIPT_DIR/../local_testnet/start_local_testnet.sh -e $ENCLAVE_NAME -b false -n $CONFIG +if [ $? -ne 0 ]; then + echo "Failed to start local testnet" + exit_and_dump_logs 1 +fi + +# Wait for 10s before stopping non-validating nodes +sleep 10 + +# These are non validating nodes +supernode="cl-3-lighthouse-geth" +fullnode="cl-4-lighthouse-geth" + +# Stop the non-validator nodes +kurtosis service stop $ENCLAVE_NAME $supernode +kurtosis service stop $ENCLAVE_NAME $fullnode + +echo "Non-validator nodes stopped. Waiting ${OFFLINE_DURATION_SECS} seconds..." + +# Display the time every 10s when the nodes are stopped +remaining_time=$OFFLINE_DURATION_SECS +while [ $remaining_time -gt 0 ]; do + sleep 10 + remaining_time=$((remaining_time - 10)) + echo "Nodes are stopped for $((OFFLINE_DURATION_SECS - remaining_time))s, ${remaining_time}s remains..." +done + +echo "Resuming non-validator nodes..." + +# Resume the non validating nodes +kurtosis service start $ENCLAVE_NAME $supernode +kurtosis service start $ENCLAVE_NAME $fullnode + +# The time at which syncing starts after the node was stopped +sync_start_time=$(date +%s) + +# Get beacon API URLs for non validating nodes for query +supernode_url=$(kurtosis port print $ENCLAVE_NAME $supernode http) +fullnode_url=$(kurtosis port print $ENCLAVE_NAME $fullnode http) + +# Initialize statuses +declare -A node_completed +declare -A node_complete_time +declare -A node_urls + +node_urls["supernode"]="$supernode_url" +node_urls["fullnode"]="$fullnode_url" +node_completed["supernode"]=false +node_completed["fullnode"]=false + +echo "Polling sync status until nodes are synced or timeout of ${TIMEOUT_MINS} mins" + +while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode]}" = false ]; do + current_time=$(date +%s) + elapsed=$((current_time - sync_start_time)) + + if [ "$elapsed" -ge "$TIMEOUT_SECS" ]; then + echo "ERROR: Nodes timed out syncing after ${TIMEOUT_MINS} minutes. Exiting." + exit_and_dump_logs 1 + fi + + # Poll each node that hasn't completed yet + for node in "supernode" "fullnode"; do + if [ "${node_completed[$node]}" = false ]; then + poll_node "$node" + fi + done + + sleep $POLL_INTERVAL_SECS +done + +echo "Genesis sync test complete! Both supernode and fullnode have synced successfully." +echo "Supernode time: $((node_complete_time[supernode] - sync_start_time)) seconds" +echo "Fullnode time: $((node_complete_time[fullnode] - sync_start_time)) seconds" +exit_and_dump_logs 0 \ No newline at end of file From 076a1c3faead931867088d3907be4ff253648949 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 11 Jun 2025 09:39:22 -0700 Subject: [PATCH 2/3] Data column sidecar event (#7587) N/A Implement events for data column sidecar https://github.com/ethereum/beacon-APIs/pull/535 --- beacon_node/beacon_chain/src/beacon_chain.rs | 53 ++++++++++-- .../src/data_column_verification.rs | 4 +- beacon_node/beacon_chain/src/events.rs | 15 ++++ beacon_node/beacon_chain/src/test_utils.rs | 2 +- beacon_node/beacon_chain/tests/events.rs | 84 ++++++++++++++++++- beacon_node/http_api/src/lib.rs | 3 + common/eth2/src/types.rs | 39 +++++++++ 7 files changed, 188 insertions(+), 12 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 990f4b6099..0c9eb33516 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -73,7 +73,9 @@ use crate::{ kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead, }; -use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes}; +use eth2::types::{ + EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes, +}; use execution_layer::{ BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition, PayloadAttributes, PayloadStatus, @@ -3087,6 +3089,11 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } + self.emit_sse_data_column_sidecar_events( + &block_root, + data_columns.iter().map(|column| column.as_data_column()), + ); + let r = self .check_gossip_data_columns_availability_and_import( slot, @@ -3158,10 +3165,16 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } - // process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS - // consumers don't expect the blobs event to fire erratically. - if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output { - self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob())); + match &engine_get_blobs_output { + EngineGetBlobsOutput::Blobs(blobs) => { + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob())); + } + EngineGetBlobsOutput::CustodyColumns(columns) => { + self.emit_sse_data_column_sidecar_events( + &block_root, + columns.iter().map(|column| column.as_data_column()), + ); + } } let r = self @@ -3191,6 +3204,31 @@ impl BeaconChain { } } + fn emit_sse_data_column_sidecar_events<'a, I>( + self: &Arc, + block_root: &Hash256, + data_columns_iter: I, + ) where + I: Iterator>, + { + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_data_column_sidecar_subscribers() { + let imported_data_columns = self + .data_availability_checker + .cached_data_column_indexes(block_root) + .unwrap_or_default(); + let new_data_columns = + data_columns_iter.filter(|b| !imported_data_columns.contains(&b.index)); + + for data_column in new_data_columns { + event_handler.register(EventKind::DataColumnSidecar( + SseDataColumnSidecar::from_data_column_sidecar(data_column), + )); + } + } + } + } + /// Cache the columns in the processing cache, process it, then evict it from the cache if it was /// imported or errors. pub async fn process_rpc_custody_columns( @@ -3231,6 +3269,11 @@ impl BeaconChain { } } + self.emit_sse_data_column_sidecar_events( + &block_root, + custody_columns.iter().map(|column| column.as_ref()), + ); + let r = self .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) .await; diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 3e54ee9199..609e5bd796 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -215,8 +215,7 @@ impl GossipVerifiedDataColumn } /// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. - #[cfg(test)] - pub(crate) fn __new_for_testing(column_sidecar: Arc>) -> Self { + pub fn __new_for_testing(column_sidecar: Arc>) -> Self { Self { block_root: column_sidecar.block_root(), data_column: KzgVerifiedDataColumn::__new_for_testing(column_sidecar), @@ -268,7 +267,6 @@ impl KzgVerifiedDataColumn { } /// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. - #[cfg(test)] pub(crate) fn __new_for_testing(data_column: Arc>) -> Self { Self { data: data_column } } diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index d09b74e645..94ebfb4655 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -11,6 +11,7 @@ pub struct ServerSentEventHandler { single_attestation_tx: Sender>, block_tx: Sender>, blob_sidecar_tx: Sender>, + data_column_sidecar_tx: Sender>, finalized_tx: Sender>, head_tx: Sender>, exit_tx: Sender>, @@ -37,6 +38,7 @@ impl ServerSentEventHandler { let (single_attestation_tx, _) = broadcast::channel(capacity); let (block_tx, _) = broadcast::channel(capacity); let (blob_sidecar_tx, _) = broadcast::channel(capacity); + let (data_column_sidecar_tx, _) = broadcast::channel(capacity); let (finalized_tx, _) = broadcast::channel(capacity); let (head_tx, _) = broadcast::channel(capacity); let (exit_tx, _) = broadcast::channel(capacity); @@ -57,6 +59,7 @@ impl ServerSentEventHandler { single_attestation_tx, block_tx, blob_sidecar_tx, + data_column_sidecar_tx, finalized_tx, head_tx, exit_tx, @@ -99,6 +102,10 @@ impl ServerSentEventHandler { .blob_sidecar_tx .send(kind) .map(|count| log_count("blob sidecar", count)), + EventKind::DataColumnSidecar(_) => self + .data_column_sidecar_tx + .send(kind) + .map(|count| log_count("data_column_sidecar", count)), EventKind::FinalizedCheckpoint(_) => self .finalized_tx .send(kind) @@ -177,6 +184,10 @@ impl ServerSentEventHandler { self.blob_sidecar_tx.subscribe() } + pub fn subscribe_data_column_sidecar(&self) -> Receiver> { + self.data_column_sidecar_tx.subscribe() + } + pub fn subscribe_finalized(&self) -> Receiver> { self.finalized_tx.subscribe() } @@ -249,6 +260,10 @@ impl ServerSentEventHandler { self.blob_sidecar_tx.receiver_count() > 0 } + pub fn has_data_column_sidecar_subscribers(&self) -> bool { + self.data_column_sidecar_tx.receiver_count() > 0 + } + pub fn has_finalized_subscribers(&self) -> bool { self.finalized_tx.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 3ee8c7ae3f..db6968b662 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -72,7 +72,7 @@ pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME"; // Pre-computed data column sidecar using a single static blob from: // `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz` -const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = +pub const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz"); // Default target aggregators to set during testing, this ensures an aggregator at each slot. diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index c9bd55e062..5d0f22e252 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -1,11 +1,15 @@ use beacon_chain::blob_verification::GossipVerifiedBlob; -use beacon_chain::test_utils::BeaconChainHarness; -use eth2::types::{EventKind, SseBlobSidecar}; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; +use beacon_chain::test_utils::{BeaconChainHarness, TEST_DATA_COLUMN_SIDECARS_SSZ}; +use eth2::types::{EventKind, SseBlobSidecar, SseDataColumnSidecar}; use rand::rngs::StdRng; use rand::SeedableRng; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec}; +use types::test_utils::TestRandom; +use types::{ + BlobSidecar, DataColumnSidecar, EthSpec, ForkName, MinimalEthSpec, RuntimeVariableList, +}; type E = MinimalEthSpec; @@ -43,6 +47,42 @@ async fn blob_sidecar_event_on_process_gossip_blob() { assert_eq!(sidecar_event, EventKind::BlobSidecar(expected_sse_blobs)); } +/// Verifies that a data column event is emitted when a gossip verified data column is received via gossip or the publish block API. +#[tokio::test] +async fn data_column_sidecar_event_on_process_gossip_data_column() { + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // subscribe to blob sidecar events + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar(); + + // build and process a gossip verified data column + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + let sidecar = Arc::new(DataColumnSidecar::random_for_test(&mut rng)); + let gossip_verified_data_column = GossipVerifiedDataColumn::__new_for_testing(sidecar); + let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar( + gossip_verified_data_column.as_data_column(), + ); + + let _ = harness + .chain + .process_gossip_data_columns(vec![gossip_verified_data_column], || Ok(())) + .await + .unwrap(); + + let sidecar_event = data_column_event_receiver.try_recv().unwrap(); + assert_eq!( + sidecar_event, + EventKind::DataColumnSidecar(expected_sse_data_column) + ); +} + /// Verifies that a blob event is emitted when blobs are received via RPC. #[tokio::test] async fn blob_sidecar_event_on_process_rpc_blobs() { @@ -95,3 +135,41 @@ async fn blob_sidecar_event_on_process_rpc_blobs() { } assert_eq!(sse_blobs, expected_sse_blobs); } + +#[tokio::test] +async fn data_column_sidecar_event_on_process_rpc_columns() { + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone()) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // subscribe to blob sidecar events + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar(); + + // load the precomputed column sidecar to avoid computing them for every block in the tests. + let mut sidecar = RuntimeVariableList::>::from_ssz_bytes( + TEST_DATA_COLUMN_SIDECARS_SSZ, + spec.number_of_columns as usize, + ) + .unwrap()[0] + .clone(); + let parent_root = harness.chain.head().head_block_root(); + sidecar.signed_block_header.message.parent_root = parent_root; + let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar(&sidecar); + + let _ = harness + .chain + .process_rpc_custody_columns(vec![Arc::new(sidecar)]) + .await + .unwrap(); + + let sidecar_event = data_column_event_receiver.try_recv().unwrap(); + assert_eq!( + sidecar_event, + EventKind::DataColumnSidecar(expected_sse_data_column) + ); +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 2eaa33a964..b220685b86 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4741,6 +4741,9 @@ pub fn serve( api_types::EventTopic::BlobSidecar => { event_handler.subscribe_blob_sidecar() } + api_types::EventTopic::DataColumnSidecar => { + event_handler.subscribe_data_column_sidecar() + } api_types::EventTopic::Attestation => { event_handler.subscribe_attestation() } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index b8c74d4dcd..f7bda17eb1 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -960,6 +960,35 @@ impl SseBlobSidecar { } } +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseDataColumnSidecar { + pub block_root: Hash256, + #[serde(with = "serde_utils::quoted_u64")] + pub index: u64, + pub slot: Slot, + pub kzg_commitments: Vec, + pub versioned_hashes: Vec, +} + +impl SseDataColumnSidecar { + pub fn from_data_column_sidecar( + data_column_sidecar: &DataColumnSidecar, + ) -> SseDataColumnSidecar { + let kzg_commitments = data_column_sidecar.kzg_commitments.to_vec(); + let versioned_hashes = kzg_commitments + .iter() + .map(|c| c.calculate_versioned_hash()) + .collect(); + SseDataColumnSidecar { + block_root: data_column_sidecar.block_root(), + index: data_column_sidecar.index, + slot: data_column_sidecar.slot(), + kzg_commitments, + versioned_hashes, + } + } +} + #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct SseFinalizedCheckpoint { pub block: Hash256, @@ -1110,6 +1139,7 @@ pub enum EventKind { SingleAttestation(Box), Block(SseBlock), BlobSidecar(SseBlobSidecar), + DataColumnSidecar(SseDataColumnSidecar), FinalizedCheckpoint(SseFinalizedCheckpoint), Head(SseHead), VoluntaryExit(SignedVoluntaryExit), @@ -1133,6 +1163,7 @@ impl EventKind { EventKind::Head(_) => "head", EventKind::Block(_) => "block", EventKind::BlobSidecar(_) => "blob_sidecar", + EventKind::DataColumnSidecar(_) => "data_column_sidecar", EventKind::Attestation(_) => "attestation", EventKind::SingleAttestation(_) => "single_attestation", EventKind::VoluntaryExit(_) => "voluntary_exit", @@ -1168,6 +1199,11 @@ impl EventKind { "blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)), )?)), + "data_column_sidecar" => Ok(EventKind::DataColumnSidecar( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Data Column Sidecar: {:?}", e)) + })?, + )), "chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)), )?)), @@ -1257,6 +1293,7 @@ pub enum EventTopic { Head, Block, BlobSidecar, + DataColumnSidecar, Attestation, SingleAttestation, VoluntaryExit, @@ -1283,6 +1320,7 @@ impl FromStr for EventTopic { "head" => Ok(EventTopic::Head), "block" => Ok(EventTopic::Block), "blob_sidecar" => Ok(EventTopic::BlobSidecar), + "data_column_sidecar" => Ok(EventTopic::DataColumnSidecar), "attestation" => Ok(EventTopic::Attestation), "single_attestation" => Ok(EventTopic::SingleAttestation), "voluntary_exit" => Ok(EventTopic::VoluntaryExit), @@ -1310,6 +1348,7 @@ impl fmt::Display for EventTopic { EventTopic::Head => write!(f, "head"), EventTopic::Block => write!(f, "block"), EventTopic::BlobSidecar => write!(f, "blob_sidecar"), + EventTopic::DataColumnSidecar => write!(f, "data_column_sidecar"), EventTopic::Attestation => write!(f, "attestation"), EventTopic::SingleAttestation => write!(f, "single_attestation"), EventTopic::VoluntaryExit => write!(f, "voluntary_exit"), From 5f208bb8582984bc762456eeb91e41d051fa9630 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 11 Jun 2025 11:10:06 -0700 Subject: [PATCH 3/3] Implement basic validator custody framework (no backfill) (#7578) Resolves #6767 This PR implements a basic version of validator custody. - It introduces a new `CustodyContext` object which contains info regarding number of validators attached to a node and the custody count they contribute to the cgc. - The `CustodyContext` is added in the da_checker and has methods for returning the current cgc and the number of columns to sample at head. Note that the logic for returning the cgc existed previously in the network globals. - To estimate the number of validators attached, we use the `beacon_committee_subscriptions` endpoint. This might overestimate the number of validators actually publishing attestations from the node in the case of multi BN setups. We could also potentially use the `publish_attestations` endpoint to get a more conservative estimate at a later point. - Anytime there's a change in the `custody_group_count` due to addition/removal of validators, the custody context should send an event on a broadcast channnel. The only subscriber for the channel exists in the network service which simply subscribes to more subnets. There can be additional subscribers in sync that will start a backfill once the cgc changes. TODO - [ ] **NOT REQUIRED:** Currently, the logic only handles an increase in validator count and does not handle a decrease. We should ideally unsubscribe from subnets when the cgc has decreased. - [ ] **NOT REQUIRED:** Add a service in the `CustodyContext` that emits an event once `MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS ` passes after updating the current cgc. This event should be picked up by a subscriber which updates the enr and metadata. - [x] Add more tests --- beacon_node/beacon_chain/src/beacon_chain.rs | 25 +- .../beacon_chain/src/block_verification.rs | 24 +- .../src/block_verification_types.rs | 16 - beacon_node/beacon_chain/src/builder.rs | 26 +- .../src/data_availability_checker.rs | 43 +- .../overflow_lru_cache.rs | 71 +-- .../state_lru_cache.rs | 8 - beacon_node/beacon_chain/src/lib.rs | 3 + .../beacon_chain/src/persisted_custody.rs | 46 ++ beacon_node/beacon_chain/src/test_utils.rs | 35 +- .../beacon_chain/src/validator_custody.rs | 447 ++++++++++++++++++ .../beacon_chain/tests/block_verification.rs | 95 +--- .../tests/payload_invalidation.rs | 15 +- beacon_node/beacon_chain/tests/store_tests.rs | 12 +- beacon_node/http_api/src/lib.rs | 47 +- beacon_node/http_api/src/publish_blocks.rs | 11 +- .../tests/broadcast_validation_tests.rs | 22 +- .../lighthouse_network/src/discovery/mod.rs | 10 + .../lighthouse_network/src/service/mod.rs | 66 ++- .../lighthouse_network/src/types/globals.rs | 72 ++- .../lighthouse_network/src/types/topics.rs | 10 +- .../lighthouse_network/tests/common.rs | 3 +- beacon_node/network/src/metrics.rs | 4 +- .../gossip_methods.rs | 5 +- .../src/network_beacon_processor/mod.rs | 9 +- .../src/network_beacon_processor/tests.rs | 20 +- beacon_node/network/src/service.rs | 26 +- .../src/sync/block_sidecar_coupling.rs | 12 +- .../network/src/sync/network_context.rs | 13 +- .../network/src/sync/range_sync/chain.rs | 2 +- beacon_node/network/src/sync/tests/lookups.rs | 8 +- beacon_node/network/src/sync/tests/range.rs | 12 +- beacon_node/store/src/lib.rs | 3 + consensus/types/src/chain_spec.rs | 34 +- scripts/local_testnet/network_params_das.yaml | 4 +- .../tests/checkpoint-sync-config-devnet.yaml | 8 +- scripts/tests/genesis-sync-config-fulu.yaml | 9 +- testing/ef_tests/src/cases/fork_choice.rs | 2 +- 38 files changed, 928 insertions(+), 350 deletions(-) create mode 100644 beacon_node/beacon_chain/src/persisted_custody.rs create mode 100644 beacon_node/beacon_chain/src/validator_custody.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0c9eb33516..50efb367a8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -58,12 +58,14 @@ use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; use crate::persisted_beacon_chain::PersistedBeaconChain; +use crate::persisted_custody::persist_custody_context; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; +use crate::validator_custody::CustodyContextSsz; use crate::validator_monitor::{ get_slot_delay_ms, timestamp_now, ValidatorMonitor, HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, @@ -670,6 +672,23 @@ impl BeaconChain { Ok(()) } + /// Persists the custody information to disk. + pub fn persist_custody_context(&self) -> Result<(), Error> { + let custody_context: CustodyContextSsz = self + .data_availability_checker + .custody_context() + .as_ref() + .into(); + debug!(?custody_context, "Persisting custody context to store"); + + persist_custody_context::( + self.store.clone(), + custody_context, + )?; + + Ok(()) + } + /// Returns the slot _right now_ according to `self.slot_clock`. Returns `Err` if the slot is /// unavailable. /// @@ -2990,7 +3009,6 @@ impl BeaconChain { pub async fn verify_block_for_gossip( self: &Arc, block: Arc>, - custody_columns_count: usize, ) -> Result, BlockError> { let chain = self.clone(); self.task_executor @@ -3000,7 +3018,7 @@ impl BeaconChain { let slot = block.slot(); let graffiti_string = block.message().body().graffiti().as_utf8_lossy(); - match GossipVerifiedBlock::new(block, &chain, custody_columns_count) { + match GossipVerifiedBlock::new(block, &chain) { Ok(verified) => { let commitments_formatted = verified.block.commitments_formatted(); debug!( @@ -7232,7 +7250,8 @@ impl Drop for BeaconChain { let drop = || -> Result<(), Error> { self.persist_fork_choice()?; self.persist_op_pool()?; - self.persist_eth1_cache() + self.persist_eth1_cache()?; + self.persist_custody_context() }; if let Err(e) = drop() { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 1bbf845fa5..ba501f617d 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -685,7 +685,6 @@ pub struct GossipVerifiedBlock { pub block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, - custody_columns_count: usize, } /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit @@ -721,7 +720,6 @@ pub trait IntoGossipVerifiedBlock: Sized { fn into_gossip_verified_block( self, chain: &BeaconChain, - custody_columns_count: usize, ) -> Result, BlockError>; fn inner_block(&self) -> Arc>; } @@ -730,7 +728,6 @@ impl IntoGossipVerifiedBlock for GossipVerifiedBlock fn into_gossip_verified_block( self, _chain: &BeaconChain, - _custody_columns_count: usize, ) -> Result, BlockError> { Ok(self) } @@ -743,9 +740,8 @@ impl IntoGossipVerifiedBlock for Arc, - custody_columns_count: usize, ) -> Result, BlockError> { - GossipVerifiedBlock::new(self, chain, custody_columns_count) + GossipVerifiedBlock::new(self, chain) } fn inner_block(&self) -> Arc> { @@ -821,7 +817,6 @@ impl GossipVerifiedBlock { pub fn new( block: Arc>, chain: &BeaconChain, - custody_columns_count: usize, ) -> Result { // If the block is valid for gossip we don't supply it to the slasher here because // we assume it will be transformed into a fully verified block. We *do* need to supply @@ -831,14 +826,12 @@ impl GossipVerifiedBlock { // The `SignedBeaconBlock` and `SignedBeaconBlockHeader` have the same canonical root, // but it's way quicker to calculate root of the header since the hash of the tree rooted // at `BeaconBlockBody` is already computed in the header. - Self::new_without_slasher_checks(block, &header, chain, custody_columns_count).map_err( - |e| { - process_block_slash_info::<_, BlockError>( - chain, - BlockSlashInfo::from_early_error_block(header, e), - ) - }, - ) + Self::new_without_slasher_checks(block, &header, chain).map_err(|e| { + process_block_slash_info::<_, BlockError>( + chain, + BlockSlashInfo::from_early_error_block(header, e), + ) + }) } /// As for new, but doesn't pass the block to the slasher. @@ -846,7 +839,6 @@ impl GossipVerifiedBlock { block: Arc>, block_header: &SignedBeaconBlockHeader, chain: &BeaconChain, - custody_columns_count: usize, ) -> Result { // Ensure the block is the correct structure for the fork at `block.slot()`. block @@ -1054,7 +1046,6 @@ impl GossipVerifiedBlock { block_root, parent, consensus_context, - custody_columns_count, }) } @@ -1202,7 +1193,6 @@ impl SignatureVerifiedBlock { block: MaybeAvailableBlock::AvailabilityPending { block_root: from.block_root, block, - custody_columns_count: from.custody_columns_count, }, block_root: from.block_root, parent: Some(parent), diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index dab54dc823..f7002dcee1 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -31,7 +31,6 @@ use types::{ pub struct RpcBlock { block_root: Hash256, block: RpcBlockInner, - custody_columns_count: usize, } impl Debug for RpcBlock { @@ -45,10 +44,6 @@ impl RpcBlock { self.block_root } - pub fn custody_columns_count(&self) -> usize { - self.custody_columns_count - } - pub fn as_block(&self) -> &SignedBeaconBlock { match &self.block { RpcBlockInner::Block(block) => block, @@ -103,14 +98,12 @@ impl RpcBlock { pub fn new_without_blobs( block_root: Option, block: Arc>, - custody_columns_count: usize, ) -> Self { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); Self { block_root, block: RpcBlockInner::Block(block), - custody_columns_count, } } @@ -152,8 +145,6 @@ impl RpcBlock { Ok(Self { block_root, block: inner, - // Block is before PeerDAS - custody_columns_count: 0, }) } @@ -161,7 +152,6 @@ impl RpcBlock { block_root: Option, block: Arc>, custody_columns: Vec>, - custody_columns_count: usize, spec: &ChainSpec, ) -> Result { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); @@ -182,7 +172,6 @@ impl RpcBlock { Ok(Self { block_root, block: inner, - custody_columns_count, }) } @@ -250,12 +239,10 @@ impl ExecutedBlock { MaybeAvailableBlock::AvailabilityPending { block_root: _, block: pending_block, - custody_columns_count, } => Self::AvailabilityPending(AvailabilityPendingExecutedBlock::new( pending_block, import_data, payload_verification_outcome, - custody_columns_count, )), } } @@ -321,7 +308,6 @@ pub struct AvailabilityPendingExecutedBlock { pub block: Arc>, pub import_data: BlockImportData, pub payload_verification_outcome: PayloadVerificationOutcome, - pub custody_columns_count: usize, } impl AvailabilityPendingExecutedBlock { @@ -329,13 +315,11 @@ impl AvailabilityPendingExecutedBlock { block: Arc>, import_data: BlockImportData, payload_verification_outcome: PayloadVerificationOutcome, - custody_columns_count: usize, ) -> Self { Self { block, import_data, payload_verification_outcome, - custody_columns_count, } } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2346aca00b..ae07584f27 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -13,10 +13,12 @@ use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::observed_data_sidecars::ObservedDataSidecars; use crate::persisted_beacon_chain::PersistedBeaconChain; +use crate::persisted_custody::load_custody_context; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::ChainConfig; +use crate::CustodyContext; use crate::{ BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, Eth1Chain, Eth1ChainBackend, ServerSentEventHandler, @@ -926,6 +928,20 @@ where } }; + // Load the persisted custody context from the db and initialize + // the context for this run + let custody_context = if let Some(custody) = + load_custody_context::(store.clone()) + { + Arc::new(CustodyContext::new_from_persisted_custody_context( + custody, + self.import_all_data_columns, + )) + } else { + Arc::new(CustodyContext::new(self.import_all_data_columns)) + }; + debug!(?custody_context, "Loading persisted custody context"); + let beacon_chain = BeaconChain { spec: self.spec.clone(), config: self.chain_config, @@ -999,8 +1015,14 @@ where validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, data_availability_checker: Arc::new( - DataAvailabilityChecker::new(slot_clock, self.kzg.clone(), store, self.spec) - .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, + DataAvailabilityChecker::new( + slot_clock, + self.kzg.clone(), + store, + custody_context, + self.spec, + ) + .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), kzg: self.kzg.clone(), rng: Arc::new(Mutex::new(rng)), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 0fd417389b..91ff5fb644 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -5,7 +5,7 @@ use crate::block_verification_types::{ use crate::data_availability_checker::overflow_lru_cache::{ DataAvailabilityCheckerInner, ReconstructColumnsDecision, }; -use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore}; +use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext}; use kzg::Kzg; use slot_clock::SlotClock; use std::fmt; @@ -74,6 +74,7 @@ pub struct DataAvailabilityChecker { availability_cache: Arc>, slot_clock: T::SlotClock, kzg: Arc, + custody_context: Arc, spec: Arc, } @@ -111,17 +112,28 @@ impl DataAvailabilityChecker { slot_clock: T::SlotClock, kzg: Arc, store: BeaconStore, + custody_context: Arc, spec: Arc, ) -> Result { - let inner = DataAvailabilityCheckerInner::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; + let inner = DataAvailabilityCheckerInner::new( + OVERFLOW_LRU_CAPACITY, + store, + custody_context.clone(), + spec.clone(), + )?; Ok(Self { availability_cache: Arc::new(inner), slot_clock, kzg, + custody_context, spec, }) } + pub fn custody_context(&self) -> Arc { + self.custody_context.clone() + } + /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. pub fn get_execution_valid_block( @@ -297,7 +309,6 @@ impl DataAvailabilityChecker { &self, block: RpcBlock, ) -> Result, AvailabilityCheckError> { - let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); if self.blobs_required_for_block(&block) { return if let Some(blob_list) = blobs { @@ -311,11 +322,7 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), })) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - }) + Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) }; } if self.data_columns_required_for_block(&block) { @@ -340,11 +347,7 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), })) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - }) + Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) }; } @@ -401,7 +404,6 @@ impl DataAvailabilityChecker { } for block in blocks { - let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); let maybe_available_block = if self.blobs_required_for_block(&block) { @@ -414,11 +416,7 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), }) } else { - MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - } + MaybeAvailableBlock::AvailabilityPending { block_root, block } } } else if self.data_columns_required_for_block(&block) { if let Some(data_columns) = data_columns { @@ -432,11 +430,7 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), }) } else { - MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - } + MaybeAvailableBlock::AvailabilityPending { block_root, block } } } else { MaybeAvailableBlock::Available(AvailableBlock { @@ -786,7 +780,6 @@ pub enum MaybeAvailableBlock { AvailabilityPending { block_root: Hash256, block: Arc>, - custody_columns_count: usize, }, } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 3478c183f3..36c4f2cdc1 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -8,6 +8,7 @@ use crate::block_verification_types::{ use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::BeaconChainTypes; +use crate::CustodyContext; use lru::LruCache; use parking_lot::RwLock; use std::cmp::Ordering; @@ -158,6 +159,7 @@ impl PendingComponents { pub fn make_available( &mut self, spec: &Arc, + num_expected_columns: u64, recover: R, ) -> Result>, AvailabilityCheckError> where @@ -171,12 +173,11 @@ impl PendingComponents { }; let num_expected_blobs = block.num_blobs_expected(); - + let num_expected_columns = num_expected_columns as usize; let blob_data = if num_expected_blobs == 0 { Some(AvailableBlockData::NoData) } else if spec.is_peer_das_enabled_for_epoch(block.epoch()) { let num_received_columns = self.verified_data_columns.len(); - let num_expected_columns = block.custody_columns_count(); match num_received_columns.cmp(&num_expected_columns) { Ordering::Greater => { // Should never happen @@ -254,7 +255,6 @@ impl PendingComponents { block, import_data, payload_verification_outcome, - custody_columns_count: _, } = recover(block.clone())?; let available_block = AvailableBlock { @@ -308,19 +308,21 @@ impl PendingComponents { }) } - pub fn status_str(&self, block_epoch: Epoch, spec: &ChainSpec) -> String { + pub fn status_str( + &self, + block_epoch: Epoch, + num_expected_columns: Option, + spec: &ChainSpec, + ) -> String { let block_count = if self.executed_block.is_some() { 1 } else { 0 }; if spec.is_peer_das_enabled_for_epoch(block_epoch) { - let custody_columns_count = if let Some(block) = self.get_cached_block() { - &block.custody_columns_count().to_string() - } else { - "?" - }; format!( "block {} data_columns {}/{}", block_count, self.verified_data_columns.len(), - custody_columns_count, + num_expected_columns + .map(|c| c.to_string()) + .unwrap_or("?".into()) ) } else { let num_expected_blobs = if let Some(block) = self.get_cached_block() { @@ -346,6 +348,7 @@ pub struct DataAvailabilityCheckerInner { /// This cache holds a limited number of states in memory and reconstructs them /// from disk when necessary. This is necessary until we merge tree-states state_cache: StateLRUCache, + custody_context: Arc, spec: Arc, } @@ -362,11 +365,13 @@ impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, + custody_context: Arc, spec: Arc, ) -> Result { Ok(Self { critical: RwLock::new(LruCache::new(capacity)), state_cache: StateLRUCache::new(beacon_store, spec.clone()), + custody_context, spec, }) } @@ -470,13 +475,15 @@ impl DataAvailabilityCheckerInner { debug!( component = "blobs", ?block_root, - status = pending_components.status_str(epoch, &self.spec), + status = pending_components.status_str(epoch, None, &self.spec), "Component added to data availability checker" ); - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { + if let Some(available_block) = pending_components.make_available( + &self.spec, + self.custody_context.sampling_size(Some(epoch), &self.spec), + |block| self.state_cache.recover_pending_executed_block(block), + )? { // We keep the pending components in the availability cache during block import (#5845). write_lock.put(block_root, pending_components); drop(write_lock); @@ -519,16 +526,19 @@ impl DataAvailabilityCheckerInner { // Merge in the data columns. pending_components.merge_data_columns(kzg_verified_data_columns)?; + let num_expected_columns = self.custody_context.sampling_size(Some(epoch), &self.spec); debug!( component = "data_columns", ?block_root, - status = pending_components.status_str(epoch, &self.spec), + status = pending_components.status_str(epoch, Some(num_expected_columns), &self.spec), "Component added to data availability checker" ); - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { + if let Some(available_block) = + pending_components.make_available(&self.spec, num_expected_columns, |block| { + self.state_cache.recover_pending_executed_block(block) + })? + { // We keep the pending components in the availability cache during block import (#5845). write_lock.put(block_root, pending_components); drop(write_lock); @@ -612,17 +622,20 @@ impl DataAvailabilityCheckerInner { // Merge in the block. pending_components.merge_block(diet_executed_block); + let num_expected_columns = self.custody_context.sampling_size(Some(epoch), &self.spec); debug!( component = "block", ?block_root, - status = pending_components.status_str(epoch, &self.spec), + status = pending_components.status_str(epoch, Some(num_expected_columns), &self.spec), "Component added to data availability checker" ); // Check if we have all components and entire set is consistent. - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { + if let Some(available_block) = pending_components.make_available( + &self.spec, + self.custody_context.sampling_size(Some(epoch), &self.spec), + |block| self.state_cache.recover_pending_executed_block(block), + )? { // We keep the pending components in the availability cache during block import (#5845). write_lock.put(block_root, pending_components); drop(write_lock); @@ -700,7 +713,6 @@ mod test { use types::{ExecPayload, MinimalEthSpec}; const LOW_VALIDATOR_COUNT: usize = 32; - const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 8; fn get_store_with_spec( db_path: &TempDir, @@ -861,7 +873,6 @@ mod test { block, import_data, payload_verification_outcome, - custody_columns_count: DEFAULT_TEST_CUSTODY_COLUMN_COUNT, }; (availability_pending_block, gossip_verified_blobs) @@ -888,9 +899,15 @@ mod test { let spec = harness.spec.clone(); let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); + let custody_context = Arc::new(CustodyContext::new(false)); let cache = Arc::new( - DataAvailabilityCheckerInner::::new(capacity_non_zero, test_store, spec.clone()) - .expect("should create cache"), + DataAvailabilityCheckerInner::::new( + capacity_non_zero, + test_store, + custody_context, + spec.clone(), + ) + .expect("should create cache"), ); (harness, cache, chain_db_path) } @@ -1239,8 +1256,6 @@ mod pending_components_tests { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }, - // Default custody columns count, doesn't matter here - custody_columns_count: 8, }; (block.into(), blobs, invalid_blobs) } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 5fe674f30c..f73857f468 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -24,7 +24,6 @@ pub struct DietAvailabilityPendingExecutedBlock { parent_eth1_finalization_data: Eth1FinalizationData, consensus_context: OnDiskConsensusContext, payload_verification_outcome: PayloadVerificationOutcome, - custody_columns_count: usize, } /// just implementing the same methods as `AvailabilityPendingExecutedBlock` @@ -54,10 +53,6 @@ impl DietAvailabilityPendingExecutedBlock { .unwrap_or_default() } - pub fn custody_columns_count(&self) -> usize { - self.custody_columns_count - } - /// Returns the epoch corresponding to `self.slot()`. pub fn epoch(&self) -> Epoch { self.block.slot().epoch(E::slots_per_epoch()) @@ -107,7 +102,6 @@ impl StateLRUCache { executed_block.import_data.consensus_context, ), payload_verification_outcome: executed_block.payload_verification_outcome, - custody_columns_count: executed_block.custody_columns_count, } } @@ -137,7 +131,6 @@ impl StateLRUCache { .into_consensus_context(), }, payload_verification_outcome: diet_executed_block.payload_verification_outcome, - custody_columns_count: diet_executed_block.custody_columns_count, }) } @@ -224,7 +217,6 @@ impl From> value.import_data.consensus_context, ), payload_verification_outcome: value.payload_verification_outcome, - custody_columns_count: value.custody_columns_count, } } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 5b79312d37..0eec6dc770 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -48,6 +48,7 @@ pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; mod persisted_beacon_chain; +pub mod persisted_custody; mod persisted_fork_choice; mod pre_finalization_cache; pub mod proposer_prep_service; @@ -59,6 +60,7 @@ pub mod summaries_dag; pub mod sync_committee_rewards; pub mod sync_committee_verification; pub mod test_utils; +pub mod validator_custody; pub mod validator_monitor; pub mod validator_pubkey_cache; @@ -100,3 +102,4 @@ pub use state_processing::per_block_processing::errors::{ }; pub use store; pub use types; +pub use validator_custody::CustodyContext; diff --git a/beacon_node/beacon_chain/src/persisted_custody.rs b/beacon_node/beacon_chain/src/persisted_custody.rs new file mode 100644 index 0000000000..6ede473b36 --- /dev/null +++ b/beacon_node/beacon_chain/src/persisted_custody.rs @@ -0,0 +1,46 @@ +use crate::validator_custody::CustodyContextSsz; +use ssz::{Decode, Encode}; +use std::sync::Arc; +use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; +use types::{EthSpec, Hash256}; + +/// 32-byte key for accessing the `CustodyContext`. All zero because `CustodyContext` has its own column. +pub const CUSTODY_DB_KEY: Hash256 = Hash256::ZERO; + +pub struct PersistedCustody(CustodyContextSsz); + +pub fn load_custody_context, Cold: ItemStore>( + store: Arc>, +) -> Option { + let res: Result, _> = + store.get_item::(&CUSTODY_DB_KEY); + // Load context from the store + match res { + Ok(Some(c)) => Some(c.0), + _ => None, + } +} + +/// Attempt to persist the custody context object to `self.store`. +pub fn persist_custody_context, Cold: ItemStore>( + store: Arc>, + custody_context: CustodyContextSsz, +) -> Result<(), store::Error> { + store.put_item(&CUSTODY_DB_KEY, &PersistedCustody(custody_context)) +} + +impl StoreItem for PersistedCustody { + fn db_column() -> DBColumn { + DBColumn::CustodyContext + } + + fn as_store_bytes(&self) -> Vec { + self.0.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + let custody_context = CustodyContextSsz::from_ssz_bytes(bytes)?; + + Ok(PersistedCustody(custody_context)) + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index db6968b662..c2c5d8d626 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -609,12 +609,6 @@ where let chain = builder.build().expect("should build"); - let sampling_column_count = if self.import_all_data_columns { - chain.spec.number_of_custody_groups as usize - } else { - chain.spec.custody_requirement as usize - }; - BeaconChainHarness { spec: chain.spec.clone(), chain: Arc::new(chain), @@ -625,7 +619,6 @@ where mock_execution_layer: self.mock_execution_layer, mock_builder: None, rng: make_rng(), - sampling_column_count, } } } @@ -682,7 +675,6 @@ pub struct BeaconChainHarness { pub mock_execution_layer: Option>, pub mock_builder: Option>>, - pub sampling_column_count: usize, pub rng: Mutex, } @@ -785,7 +777,10 @@ where } pub fn get_sampling_column_count(&self) -> usize { - self.sampling_column_count + self.chain + .data_availability_checker + .custody_context() + .sampling_size(None, &self.chain.spec) as usize } pub fn slots_per_epoch(&self) -> u64 { @@ -2360,7 +2355,7 @@ where .blob_kzg_commitments() .is_ok_and(|c| !c.is_empty()); if !has_blobs { - return RpcBlock::new_without_blobs(Some(block_root), block, 0); + return RpcBlock::new_without_blobs(Some(block_root), block); } // Blobs are stored as data columns from Fulu (PeerDAS) @@ -2370,14 +2365,8 @@ where .into_iter() .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns( - Some(block_root), - block, - custody_columns, - self.get_sampling_column_count(), - &self.spec, - ) - .unwrap() + RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, &self.spec) + .unwrap() } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); RpcBlock::new(Some(block_root), block, blobs).unwrap() @@ -2403,15 +2392,9 @@ where .take(sampling_column_count) .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns( - Some(block_root), - block, - columns, - sampling_column_count, - &self.spec, - )? + RpcBlock::new_with_custody_columns(Some(block_root), block, columns, &self.spec)? } else { - RpcBlock::new_without_blobs(Some(block_root), block, 0) + RpcBlock::new_without_blobs(Some(block_root), block) } } else { let blobs = blob_items diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs new file mode 100644 index 0000000000..160333b50e --- /dev/null +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -0,0 +1,447 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::atomic::{AtomicU64, Ordering}, +}; + +use parking_lot::RwLock; + +use ssz_derive::{Decode, Encode}; +use types::{ChainSpec, Epoch, EthSpec, Slot}; + +/// A delay before making the CGC change effective to the data availability checker. +const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; + +type ValidatorsAndBalances = Vec<(usize, u64)>; + +/// This currently just registers increases in validator count. +/// Does not handle decreasing validator counts +#[derive(Default, Debug)] +struct ValidatorRegistrations { + /// Set of all validators that is registered to this node along with its effective balance + /// + /// Key is validator index and value is effective_balance. + validators: HashMap, + /// Maintains the validator custody requirement at a given epoch. + /// + /// Note: Only stores the epoch value when there's a change in custody requirement. + /// So if epoch 10 and 11 has the same custody requirement, only 10 is stored. + /// This map is never pruned, because currently we never decrease custody requirement, so this + /// map size is contained at 128. + epoch_validator_custody_requirements: BTreeMap, +} + +impl ValidatorRegistrations { + /// Returns the validator custody requirement at the latest epoch. + fn latest_validator_custody_requirement(&self) -> Option { + self.epoch_validator_custody_requirements + .last_key_value() + .map(|(_, v)| *v) + } + + /// Lookup the active custody requirement at the given epoch. + fn custody_requirement_at_epoch(&self, epoch: Epoch) -> Option { + self.epoch_validator_custody_requirements + .range(..=epoch) + .last() + .map(|(_, custody_count)| *custody_count) + } + + /// Register a new validator index and updates the list of validators if required. + /// Returns `Some((effective_epoch, new_cgc))` if the registration results in a CGC update. + pub(crate) fn register_validators( + &mut self, + validators_and_balance: ValidatorsAndBalances, + slot: Slot, + spec: &ChainSpec, + ) -> Option<(Epoch, u64)> { + for (validator_index, effective_balance) in validators_and_balance { + self.validators.insert(validator_index, effective_balance); + } + + // Each `BALANCE_PER_ADDITIONAL_CUSTODY_GROUP` effectively contributes one unit of "weight". + let validator_custody_units = + self.validators.values().sum::() / spec.balance_per_additional_custody_group; + let validator_custody_requirement = + get_validators_custody_requirement(validator_custody_units, spec); + + tracing::debug!( + validator_custody_units, + validator_custody_requirement, + "Registered validators" + ); + + // If registering the new validator increased the total validator "units", then + // add a new entry for the current epoch + if Some(validator_custody_requirement) != self.latest_validator_custody_requirement() { + // Apply the change from the next epoch after adding some delay buffer to ensure + // the node has enough time to subscribe to subnets etc, and to avoid having + // inconsistent column counts within an epoch. + let effective_delay_slots = + CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS / spec.seconds_per_slot; + let effective_epoch = (slot + effective_delay_slots).epoch(E::slots_per_epoch()) + 1; + self.epoch_validator_custody_requirements + .entry(effective_epoch) + .and_modify(|old_custody| *old_custody = validator_custody_requirement) + .or_insert(validator_custody_requirement); + Some((effective_epoch, validator_custody_requirement)) + } else { + None + } + } +} + +/// Given the `validator_custody_units`, return the custody requirement based on +/// the spec parameters. +/// +/// Note: a `validator_custody_units` here represents the number of 32 eth effective_balance +/// equivalent to `BALANCE_PER_ADDITIONAL_CUSTODY_GROUP`. +/// +/// For e.g. a validator with eb 32 eth is 1 unit. +/// a validator with eb 65 eth is 65 // 32 = 2 units. +/// +/// See https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/validator.md#validator-custody +fn get_validators_custody_requirement(validator_custody_units: u64, spec: &ChainSpec) -> u64 { + std::cmp::min( + std::cmp::max(validator_custody_units, spec.validator_custody_requirement), + spec.number_of_custody_groups, + ) +} + +/// Contains all the information the node requires to calculate the +/// number of columns to be custodied when checking for DA. +#[derive(Debug)] +pub struct CustodyContext { + /// The Number of custody groups required based on the number of validators + /// that is attached to this node. + /// + /// This is the number that we use to compute the custody group count that + /// we require for data availability check, and we use to advertise to our peers in the metadata + /// and enr values. + validator_custody_count: AtomicU64, + /// Is the node run as a supernode based on current cli parameters. + pub current_is_supernode: bool, + /// The persisted value for `is_supernode` based on the previous run of this node. + /// + /// Note: We require this value because if a user restarts the node with a higher cli custody + /// count value than in the previous run, then we should continue advertising the custody + /// count based on the old value than the new one since we haven't backfilled the required + /// columns. + persisted_is_supernode: bool, + /// Maintains all the validators that this node is connected to currently + validator_registrations: RwLock, +} + +impl CustodyContext { + /// Create a new custody default custody context object when no persisted object + /// exists. + /// + /// The `is_supernode` value is based on current cli parameters. + pub fn new(is_supernode: bool) -> Self { + Self { + validator_custody_count: AtomicU64::new(0), + current_is_supernode: is_supernode, + persisted_is_supernode: is_supernode, + validator_registrations: Default::default(), + } + } + + pub fn new_from_persisted_custody_context( + ssz_context: CustodyContextSsz, + is_supernode: bool, + ) -> Self { + CustodyContext { + validator_custody_count: AtomicU64::new(ssz_context.validator_custody_at_head), + current_is_supernode: is_supernode, + persisted_is_supernode: ssz_context.persisted_is_supernode, + validator_registrations: Default::default(), + } + } + + /// Register a new validator index and updates the list of validators if required. + /// + /// Also modifies the internal structures if the validator custody has changed to + /// update the `custody_column_count`. + /// + /// Returns `Some` along with the updated custody group count if it has changed, otherwise returns `None`. + pub fn register_validators( + &self, + validators_and_balance: ValidatorsAndBalances, + slot: Slot, + spec: &ChainSpec, + ) -> Option { + let Some((effective_epoch, new_validator_custody)) = self + .validator_registrations + .write() + .register_validators::(validators_and_balance, slot, spec) + else { + return None; + }; + + let current_cgc = self.custody_group_count_at_head(spec); + let validator_custody_count_at_head = self.validator_custody_count.load(Ordering::Relaxed); + + if new_validator_custody != validator_custody_count_at_head { + tracing::debug!( + old_count = validator_custody_count_at_head, + new_count = new_validator_custody, + "Validator count at head updated" + ); + self.validator_custody_count + .store(new_validator_custody, Ordering::Relaxed); + + let updated_cgc = self.custody_group_count_at_head(spec); + // Send the message to network only if there are more columns subnets to subscribe to + if updated_cgc > current_cgc { + tracing::debug!( + old_cgc = current_cgc, + updated_cgc, + "Custody group count updated" + ); + return Some(CustodyCountChanged { + new_custody_group_count: updated_cgc, + sampling_count: self.sampling_size(Some(effective_epoch), spec), + }); + } + } + + None + } + + /// This function is used to determine the custody group count at head ONLY. + /// Do NOT use this directly for data availability check, use `self.sampling_size` instead as + /// CGC can change over epochs. + pub fn custody_group_count_at_head(&self, spec: &ChainSpec) -> u64 { + if self.current_is_supernode { + return spec.number_of_custody_groups; + } + let validator_custody_count_at_head = self.validator_custody_count.load(Ordering::Relaxed); + + // If there are no validators, return the minimum custody_requirement + if validator_custody_count_at_head > 0 { + validator_custody_count_at_head + } else { + spec.custody_requirement + } + } + + /// Returns the count of custody columns this node must sample for a block at `epoch` to import. + /// If an `epoch` is not specified, returns the *current* validator custody requirement. + pub fn sampling_size(&self, epoch_opt: Option, spec: &ChainSpec) -> u64 { + let custody_group_count = if self.current_is_supernode { + spec.number_of_custody_groups + } else if let Some(epoch) = epoch_opt { + self.validator_registrations + .read() + .custody_requirement_at_epoch(epoch) + .unwrap_or(spec.custody_requirement) + } else { + self.custody_group_count_at_head(spec) + }; + + spec.sampling_size(custody_group_count) + .expect("should compute node sampling size from valid chain spec") + } +} + +/// The custody count changed because of a change in the +/// number of validators being managed. +pub struct CustodyCountChanged { + pub new_custody_group_count: u64, + pub sampling_count: u64, +} + +/// The custody information that gets persisted across runs. +#[derive(Debug, Encode, Decode, Clone)] +pub struct CustodyContextSsz { + validator_custody_at_head: u64, + persisted_is_supernode: bool, +} + +impl From<&CustodyContext> for CustodyContextSsz { + fn from(context: &CustodyContext) -> Self { + CustodyContextSsz { + validator_custody_at_head: context.validator_custody_count.load(Ordering::Relaxed), + persisted_is_supernode: context.persisted_is_supernode, + } + } +} + +#[cfg(test)] +mod tests { + use types::MainnetEthSpec; + + use super::*; + + type E = MainnetEthSpec; + + #[test] + fn no_validators_supernode_default() { + let custody_context = CustodyContext::new(true); + let spec = E::default_spec(); + assert_eq!( + custody_context.custody_group_count_at_head(&spec), + spec.number_of_custody_groups + ); + assert_eq!( + custody_context.sampling_size(None, &spec), + spec.number_of_custody_groups + ); + } + + #[test] + fn no_validators_fullnode_default() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + assert_eq!( + custody_context.custody_group_count_at_head(&spec), + spec.custody_requirement, + "head custody count should be minimum spec custody requirement" + ); + assert_eq!( + custody_context.sampling_size(None, &spec), + spec.samples_per_slot + ); + } + + #[test] + fn register_single_validator_should_update_cgc() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + let bal_per_additional_group = spec.balance_per_additional_custody_group; + let min_val_custody_requirement = spec.validator_custody_requirement; + // One single node increases its balance over 3 epochs. + let validators_and_expected_cgc_change = vec![ + ( + vec![(0, bal_per_additional_group)], + Some(min_val_custody_requirement), + ), + // No CGC change at 8 custody units, as it's the minimum requirement + (vec![(0, 8 * bal_per_additional_group)], None), + (vec![(0, 10 * bal_per_additional_group)], Some(10)), + ]; + + register_validators_and_assert_cgc( + &custody_context, + validators_and_expected_cgc_change, + &spec, + ); + } + + #[test] + fn register_multiple_validators_should_update_cgc() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + let bal_per_additional_group = spec.balance_per_additional_custody_group; + let min_val_custody_requirement = spec.validator_custody_requirement; + // Add 3 validators over 3 epochs. + let validators_and_expected_cgc = vec![ + ( + vec![(0, bal_per_additional_group)], + Some(min_val_custody_requirement), + ), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + ], + // No CGC change at 8 custody units, as it's the minimum requirement + None, + ), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + (2, 2 * bal_per_additional_group), + ], + Some(10), + ), + ]; + + register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); + } + + #[test] + fn register_validators_should_not_update_cgc_for_supernode() { + let custody_context = CustodyContext::new(true); + let spec = E::default_spec(); + let bal_per_additional_group = spec.balance_per_additional_custody_group; + + // Add 3 validators over 3 epochs. + let validators_and_expected_cgc = vec![ + (vec![(0, bal_per_additional_group)], None), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + ], + None, + ), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + (2, 2 * bal_per_additional_group), + ], + None, + ), + ]; + + register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); + assert_eq!( + custody_context.sampling_size(None, &spec), + spec.number_of_custody_groups + ); + } + + #[test] + fn cgc_change_should_be_effective_to_sampling_after_delay() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + let current_slot = Slot::new(10); + let current_epoch = current_slot.epoch(E::slots_per_epoch()); + let default_sampling_size = custody_context.sampling_size(None, &spec); + let validator_custody_units = 10; + + let _cgc_changed = custody_context.register_validators::( + vec![( + 0, + validator_custody_units * spec.balance_per_additional_custody_group, + )], + current_slot, + &spec, + ); + + // CGC update is not applied for `current_epoch`. + assert_eq!( + custody_context.sampling_size(Some(current_epoch), &spec), + default_sampling_size + ); + // CGC update is applied for the next epoch. + assert_eq!( + custody_context.sampling_size(Some(current_epoch + 1), &spec), + validator_custody_units + ); + } + + /// Update validator every epoch and assert cgc against expected values. + fn register_validators_and_assert_cgc( + custody_context: &CustodyContext, + validators_and_expected_cgc_changed: Vec<(ValidatorsAndBalances, Option)>, + spec: &ChainSpec, + ) { + for (idx, (validators_and_balance, expected_cgc_change)) in + validators_and_expected_cgc_changed.into_iter().enumerate() + { + let epoch = Epoch::new(idx as u64); + let updated_custody_count_opt = custody_context + .register_validators::( + validators_and_balance, + epoch.start_slot(E::slots_per_epoch()), + spec, + ) + .map(|c| c.new_custody_group_count); + + assert_eq!(updated_custody_count_opt, expected_cgc_change); + } + } +} diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 9225ffd9f4..3ff5f772aa 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -30,8 +30,6 @@ type E = MainnetEthSpec; const VALIDATOR_COUNT: usize = 24; const CHAIN_SEGMENT_LENGTH: usize = 64 * 5; const BLOCK_INDICES: &[usize] = &[0, 1, 32, 64, 68 + 1, 129, CHAIN_SEGMENT_LENGTH - 1]; -// Default custody group count for tests -const CGC: usize = 8; /// A cached set of keys. static KEYPAIRS: LazyLock> = @@ -144,10 +142,9 @@ fn build_rpc_block( RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone(), columns.len(), spec) - .unwrap() + RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() } - None => RpcBlock::new_without_blobs(None, block, 0), + None => RpcBlock::new_without_blobs(None, block), } } @@ -370,7 +367,6 @@ async fn chain_segment_non_linear_parent_roots() { blocks[3] = RpcBlock::new_without_blobs( None, Arc::new(SignedBeaconBlock::from_block(block, signature)), - harness.sampling_column_count, ); assert!( @@ -408,7 +404,6 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RpcBlock::new_without_blobs( None, Arc::new(SignedBeaconBlock::from_block(block, signature)), - harness.sampling_column_count, ); assert!( @@ -436,7 +431,6 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RpcBlock::new_without_blobs( None, Arc::new(SignedBeaconBlock::from_block(block, signature)), - harness.sampling_column_count, ); assert!( @@ -578,11 +572,7 @@ async fn invalid_signature_gossip_block() { .into_block_error() .expect("should import all blocks prior to the one being tested"); let signed_block = SignedBeaconBlock::from_block(block, junk_signature()); - let rpc_block = RpcBlock::new_without_blobs( - None, - Arc::new(signed_block), - harness.sampling_column_count, - ); + let rpc_block = RpcBlock::new_without_blobs(None, Arc::new(signed_block)); let process_res = harness .chain .process_block( @@ -1002,7 +992,6 @@ async fn block_gossip_verification() { let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let block_index = CHAIN_SEGMENT_LENGTH - 2; - let cgc = harness.chain.spec.custody_requirement as usize; harness .chain @@ -1016,7 +1005,7 @@ async fn block_gossip_verification() { { let gossip_verified = harness .chain - .verify_block_for_gossip(snapshot.beacon_block.clone(), get_cgc(&blobs_opt)) + .verify_block_for_gossip(snapshot.beacon_block.clone()) .await .expect("should obtain gossip verified block"); @@ -1058,7 +1047,7 @@ async fn block_gossip_verification() { *block.slot_mut() = expected_block_slot; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::FutureSlot { present_slot, block_slot, @@ -1092,7 +1081,7 @@ async fn block_gossip_verification() { *block.slot_mut() = expected_finalized_slot; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::WouldRevertFinalizedSlot { block_slot, finalized_slot, @@ -1122,10 +1111,10 @@ async fn block_gossip_verification() { unwrap_err( harness .chain - .verify_block_for_gossip( - Arc::new(SignedBeaconBlock::from_block(block, junk_signature())), - cgc - ) + .verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block( + block, + junk_signature() + )),) .await ), BlockError::InvalidSignature(InvalidSignature::ProposerSignature) @@ -1150,7 +1139,7 @@ async fn block_gossip_verification() { *block.parent_root_mut() = parent_root; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::ParentUnknown {parent_root: p} if p == parent_root ), @@ -1176,7 +1165,7 @@ async fn block_gossip_verification() { *block.parent_root_mut() = parent_root; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::NotFinalizedDescendant { block_parent_root } if block_parent_root == parent_root ), @@ -1213,7 +1202,7 @@ async fn block_gossip_verification() { ); assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone()), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), BlockError::IncorrectBlockProposer { block, local_shuffling, @@ -1225,7 +1214,7 @@ async fn block_gossip_verification() { // Check to ensure that we registered this is a valid block from this proposer. assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone()), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), BlockError::DuplicateImportStatusUnknown(_), ), "should register any valid signature against the proposer, even if the block failed later verification" @@ -1233,11 +1222,7 @@ async fn block_gossip_verification() { let block = chain_segment[block_index].beacon_block.clone(); assert!( - harness - .chain - .verify_block_for_gossip(block, cgc) - .await - .is_ok(), + harness.chain.verify_block_for_gossip(block).await.is_ok(), "the valid block should be processed" ); @@ -1255,7 +1240,7 @@ async fn block_gossip_verification() { matches!( harness .chain - .verify_block_for_gossip(block.clone(), cgc) + .verify_block_for_gossip(block.clone()) .await .expect_err("should error when processing known block"), BlockError::DuplicateImportStatusUnknown(_) @@ -1331,17 +1316,8 @@ async fn verify_block_for_gossip_slashing_detection() { let state = harness.get_current_state(); let ((block1, blobs1), _) = harness.make_block(state.clone(), Slot::new(1)).await; let ((block2, _blobs2), _) = harness.make_block(state, Slot::new(1)).await; - let cgc = if block1.fork_name_unchecked().fulu_enabled() { - harness.get_sampling_column_count() - } else { - 0 - }; - let verified_block = harness - .chain - .verify_block_for_gossip(block1, cgc) - .await - .unwrap(); + let verified_block = harness.chain.verify_block_for_gossip(block1).await.unwrap(); if let Some((kzg_proofs, blobs)) = blobs1 { harness @@ -1364,7 +1340,7 @@ async fn verify_block_for_gossip_slashing_detection() { ) .await .unwrap(); - unwrap_err(harness.chain.verify_block_for_gossip(block2, CGC).await); + unwrap_err(harness.chain.verify_block_for_gossip(block2).await); // Slasher should have been handed the two conflicting blocks and crafted a slashing. slasher.process_queued(Epoch::new(0)).unwrap(); @@ -1388,11 +1364,7 @@ async fn verify_block_for_gossip_doppelganger_detection() { .attestations() .map(|att| att.clone_as_attestation()) .collect::>(); - let verified_block = harness - .chain - .verify_block_for_gossip(block, CGC) - .await - .unwrap(); + let verified_block = harness.chain.verify_block_for_gossip(block).await.unwrap(); harness .chain .process_block( @@ -1539,7 +1511,7 @@ async fn add_base_block_to_altair_chain() { assert!(matches!( harness .chain - .verify_block_for_gossip(Arc::new(base_block.clone()), CGC) + .verify_block_for_gossip(Arc::new(base_block.clone())) .await .expect_err("should error when processing base block"), BlockError::InconsistentFork(InconsistentFork { @@ -1549,7 +1521,7 @@ async fn add_base_block_to_altair_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let base_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(base_block.clone()), 0); + let base_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(base_block.clone())); assert!(matches!( harness .chain @@ -1573,7 +1545,7 @@ async fn add_base_block_to_altair_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(base_block), 0)], + vec![RpcBlock::new_without_blobs(None, Arc::new(base_block))], NotifyExecutionLayer::Yes, ) .await, @@ -1676,7 +1648,7 @@ async fn add_altair_block_to_base_chain() { assert!(matches!( harness .chain - .verify_block_for_gossip(Arc::new(altair_block.clone()), CGC) + .verify_block_for_gossip(Arc::new(altair_block.clone())) .await .expect_err("should error when processing altair block"), BlockError::InconsistentFork(InconsistentFork { @@ -1686,7 +1658,7 @@ async fn add_altair_block_to_base_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let altair_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(altair_block.clone()), 0); + let altair_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(altair_block.clone())); assert!(matches!( harness .chain @@ -1710,7 +1682,7 @@ async fn add_altair_block_to_base_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(altair_block), 0)], + vec![RpcBlock::new_without_blobs(None, Arc::new(altair_block))], NotifyExecutionLayer::Yes ) .await, @@ -1771,11 +1743,7 @@ async fn import_duplicate_block_unrealized_justification() { // Create two verified variants of the block, representing the same block being processed in // parallel. let notify_execution_layer = NotifyExecutionLayer::Yes; - let rpc_block = RpcBlock::new_without_blobs( - Some(block_root), - block.clone(), - harness.sampling_column_count, - ); + let rpc_block = RpcBlock::new_without_blobs(Some(block_root), block.clone()); let verified_block1 = rpc_block .clone() .into_execution_pending_block(block_root, chain, notify_execution_layer) @@ -1846,14 +1814,3 @@ async fn import_execution_pending_block( } } } - -fn get_cgc(blobs_opt: &Option>) -> usize { - if let Some(data_sidecars) = blobs_opt.as_ref() { - match data_sidecars { - DataSidecars::Blobs(_) => 0, - DataSidecars::DataColumns(d) => d.len(), - } - } else { - 0 - } -} diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 6b9ff9d6ed..05fae7aa70 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -22,7 +22,6 @@ use task_executor::ShutdownReason; use types::*; const VALIDATOR_COUNT: usize = 32; -const CGC: usize = 8; type E = MainnetEthSpec; @@ -686,8 +685,7 @@ async fn invalidates_all_descendants() { assert_eq!(fork_parent_state.slot(), fork_parent_slot); let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; - let fork_rpc_block = - RpcBlock::new_without_blobs(None, fork_block.clone(), rig.harness.sampling_column_count); + let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); let fork_block_root = rig .harness .chain @@ -789,8 +787,7 @@ async fn switches_heads() { let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; let fork_parent_root = fork_block.parent_root(); - let fork_rpc_block = - RpcBlock::new_without_blobs(None, fork_block.clone(), rig.harness.sampling_column_count); + let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); let fork_block_root = rig .harness .chain @@ -1054,14 +1051,13 @@ async fn invalid_parent() { // Ensure the block built atop an invalid payload is invalid for gossip. assert!(matches!( - rig.harness.chain.clone().verify_block_for_gossip(block.clone(), CGC).await, + rig.harness.chain.clone().verify_block_for_gossip(block.clone()).await, Err(BlockError::ParentExecutionPayloadInvalid { parent_root: invalid_root }) if invalid_root == parent_root )); // Ensure the block built atop an invalid payload is invalid for import. - let rpc_block = - RpcBlock::new_without_blobs(None, block.clone(), rig.harness.sampling_column_count); + let rpc_block = RpcBlock::new_without_blobs(None, block.clone()); assert!(matches!( rig.harness.chain.process_block(rpc_block.block_root(), rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()), @@ -1385,8 +1381,7 @@ async fn recover_from_invalid_head_by_importing_blocks() { } = InvalidHeadSetup::new().await; // Import the fork block, it should become the head. - let fork_rpc_block = - RpcBlock::new_without_blobs(None, fork_block.clone(), rig.harness.sampling_column_count); + let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); rig.harness .chain .process_block( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 51c7f0c289..d0f161ed56 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2644,11 +2644,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert_eq!(split.block_root, valid_fork_block.parent_root()); assert_ne!(split.state_root, unadvanced_split_state_root); - let invalid_fork_rpc_block = RpcBlock::new_without_blobs( - None, - invalid_fork_block.clone(), - harness.sampling_column_count, - ); + let invalid_fork_rpc_block = RpcBlock::new_without_blobs(None, invalid_fork_block.clone()); // Applying the invalid block should fail. let err = harness .chain @@ -2664,11 +2660,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert!(matches!(err, BlockError::WouldRevertFinalizedSlot { .. })); // Applying the valid block should succeed, but it should not become head. - let valid_fork_rpc_block = RpcBlock::new_without_blobs( - None, - valid_fork_block.clone(), - harness.sampling_column_count, - ); + let valid_fork_rpc_block = RpcBlock::new_without_blobs(None, valid_fork_block.clone()); harness .chain .process_block( diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b220685b86..a4ec41ac06 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -48,8 +48,8 @@ use directory::DEFAULT_ROOT_DIR; use either::Either; use eth2::types::{ self as api_types, BroadcastValidation, ContextDeserialize, EndpointVersion, ForkChoice, - ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, - ValidatorId, ValidatorStatus, ValidatorsRequestBody, + ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, StateId as CoreStateId, + ValidatorBalancesRequestBody, ValidatorId, ValidatorStatus, ValidatorsRequestBody, }; use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use health_metrics::observe::Observe; @@ -3765,15 +3765,17 @@ pub fn serve( .and(warp::path::end()) .and(warp_utils::json::json()) .and(validator_subscription_tx_filter.clone()) + .and(network_tx_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( - |subscriptions: Vec, + |committee_subscriptions: Vec, validator_subscription_tx: Sender, + network_tx: UnboundedSender>, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { - let subscriptions: std::collections::BTreeSet<_> = subscriptions + let subscriptions: std::collections::BTreeSet<_> = committee_subscriptions .iter() .map(|subscription| { chain @@ -3788,6 +3790,7 @@ pub fn serve( } }) .collect(); + let message = ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions }; if let Err(e) = validator_subscription_tx.try_send(message) { @@ -3802,6 +3805,42 @@ pub fn serve( )); } + if chain.spec.is_peer_das_scheduled() { + let (finalized_beacon_state, _, _) = + StateId(CoreStateId::Finalized).state(&chain)?; + let validators_and_balances = committee_subscriptions + .iter() + .filter_map(|subscription| { + if let Ok(effective_balance) = finalized_beacon_state + .get_effective_balance(subscription.validator_index as usize) + { + Some((subscription.validator_index as usize, effective_balance)) + } else { + None + } + }) + .collect::>(); + + let current_slot = + chain.slot().map_err(warp_utils::reject::unhandled_error)?; + if let Some(cgc_change) = chain + .data_availability_checker + .custody_context() + .register_validators::( + validators_and_balances, + current_slot, + &chain.spec, + ) { + network_tx.send(NetworkMessage::CustodyCountChanged { + new_custody_group_count: cgc_change.new_custody_group_count, + sampling_count: cgc_change.sampling_count, + }).unwrap_or_else(|e| { + debug!(error = %e, "Could not send message to the network service. \ + Likely shutdown") + }); + } + } + Ok(()) }) }, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 463f585f2c..75979bbb1d 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -138,8 +138,7 @@ pub async fn publish_block>( spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?; // Gossip verify the block and blobs/data columns separately. - let gossip_verified_block_result = unverified_block - .into_gossip_verified_block(&chain, network_globals.custody_columns_count() as usize); + let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain); let block_root = block_root.unwrap_or_else(|| { gossip_verified_block_result.as_ref().map_or_else( |_| block.canonical_root(), @@ -224,7 +223,7 @@ pub async fn publish_block>( publish_column_sidecars(network_tx, &gossip_verified_columns, &chain).map_err(|_| { warp_utils::reject::custom_server_error("unable to publish data column sidecars".into()) })?; - let sampling_columns_indices = &network_globals.sampling_columns; + let sampling_columns_indices = &network_globals.sampling_columns(); let sampling_columns = gossip_verified_columns .into_iter() .flatten() @@ -303,11 +302,7 @@ pub async fn publish_block>( ); let import_result = Box::pin(chain.process_block( block_root, - RpcBlock::new_without_blobs( - Some(block_root), - block.clone(), - network_globals.custody_columns_count() as usize, - ), + RpcBlock::new_without_blobs(Some(block_root), block.clone()), NotifyExecutionLayer::Yes, BlockImportSource::HttpApi, publish_fn, diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index cd590580be..843242c22f 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -39,9 +39,6 @@ type E = MainnetEthSpec; * */ -// Default custody group count for tests -const CGC: usize = 8; - /// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=gossip`. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn gossip_invalid() { @@ -367,9 +364,9 @@ pub async fn consensus_partial_pass_only_consensus() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_a.is_err()); /* submit `block_b` which should induce equivocation */ @@ -657,10 +654,10 @@ pub async fn equivocation_consensus_late_equivocation() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); @@ -1294,9 +1291,9 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { ProvenancedBlock::Builder(b, _, _) => b, }; - let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain, CGC); + let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain, CGC); + let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain); assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); @@ -1398,7 +1395,7 @@ pub async fn block_seen_on_gossip_without_blobs() { // Simulate the block being seen on gossip. block .clone() - .into_gossip_verified_block(&tester.harness.chain, CGC) + .into_gossip_verified_block(&tester.harness.chain) .unwrap(); // It should not yet be added to fork choice because blobs have not been seen. @@ -1467,7 +1464,7 @@ pub async fn block_seen_on_gossip_with_some_blobs() { // Simulate the block being seen on gossip. block .clone() - .into_gossip_verified_block(&tester.harness.chain, CGC) + .into_gossip_verified_block(&tester.harness.chain) .unwrap(); // Simulate some of the blobs being seen on gossip. @@ -1786,6 +1783,5 @@ fn get_custody_columns(tester: &InteractiveTester) -> HashSet { .network_globals .as_ref() .unwrap() - .sampling_columns - .clone() + .sampling_columns() } diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index ad54c6b8b1..ad4241c5b7 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -49,6 +49,7 @@ use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EnrForkId, EthSpec}; mod subnet_predicate; +use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY; pub use subnet_predicate::subnet_predicate; use types::non_zero_usize::new_non_zero_usize; @@ -476,6 +477,15 @@ impl Discovery { Ok(()) } + pub fn update_enr_cgc(&mut self, custody_group_count: u64) -> Result<(), String> { + self.discv5 + .enr_insert(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY, &custody_group_count) + .map_err(|e| format!("{:?}", e))?; + enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); + *self.network_globals.local_enr.write() = self.discv5.local_enr(); + Ok(()) + } + /// Adds/Removes a subnet from the ENR attnets/syncnets Bitfield pub fn update_enr_bitfield(&mut self, subnet: Subnet, value: bool) -> Result<(), String> { let local_enr = self.discv5.local_enr(); diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 23060df9e6..5f65a6c6d0 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -177,6 +177,7 @@ impl Network { pub async fn new( executor: task_executor::TaskExecutor, mut ctx: ServiceContext<'_>, + custody_group_count: u64, ) -> Result<(Self, Arc>), String> { let config = ctx.config.clone(); trace!("Libp2p Service starting"); @@ -201,11 +202,12 @@ impl Network { )?; // Construct the metadata - let custody_group_count = ctx.chain_spec.is_peer_das_scheduled().then(|| { - ctx.chain_spec - .custody_group_count(config.subscribe_all_data_column_subnets) - }); - let meta_data = utils::load_or_build_metadata(&config.network_dir, custody_group_count); + let custody_group_count_metadata = ctx + .chain_spec + .is_peer_das_scheduled() + .then_some(custody_group_count); + let meta_data = + utils::load_or_build_metadata(&config.network_dir, custody_group_count_metadata); let seq_number = *meta_data.seq_number(); let globals = NetworkGlobals::new( enr, @@ -885,6 +887,23 @@ impl Network { } } + /// Subscribe to all data columns determined by the cgc. + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) { + self.network_globals + .update_data_column_subnets(custody_column_count); + + for column in self.network_globals.sampling_subnets() { + let kind = GossipKind::DataColumnSidecar(column); + self.subscribe_kind(kind); + } + } + /// Returns the scoring parameters for a topic if set. #[instrument(parent = None, level = "trace", @@ -1254,6 +1273,21 @@ impl Network { self.update_metadata_bitfields(); } + /// Updates the cgc value in the ENR. + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + pub fn update_enr_cgc(&mut self, new_custody_group_count: u64) { + if let Err(e) = self.discovery_mut().update_enr_cgc(new_custody_group_count) { + crit!(error = e, "Could not update cgc in ENR"); + } + // update the local meta data which informs our peers of the update during PINGS + self.update_metadata_cgc(new_custody_group_count); + } + /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we /// would like to retain the peers for. #[instrument(parent = None, @@ -1368,6 +1402,28 @@ impl Network { utils::save_metadata_to_disk(&self.network_dir, meta_data); } + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + fn update_metadata_cgc(&mut self, custody_group_count: u64) { + let mut meta_data_w = self.network_globals.local_metadata.write(); + + *meta_data_w.seq_number_mut() += 1; + if let Ok(cgc) = meta_data_w.custody_group_count_mut() { + *cgc = custody_group_count; + } + let seq_number = *meta_data_w.seq_number(); + let meta_data = meta_data_w.clone(); + + drop(meta_data_w); + self.eth2_rpc_mut().update_seq_number(seq_number); + // Save the updated metadata to disk + utils::save_metadata_to_disk(&self.network_dir, meta_data); + } + /// Sends a Ping request to the peer. #[instrument(parent = None, level = "trace", diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index fd99d93589..d1ed1c33b0 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -31,10 +31,8 @@ pub struct NetworkGlobals { /// The current state of the backfill sync. pub backfill_state: RwLock, /// The computed sampling subnets and columns is stored to avoid re-computing. - pub sampling_subnets: HashSet, - pub sampling_columns: HashSet, - /// Constant custody group count (CGC) set at startup - custody_group_count: u64, + pub sampling_subnets: RwLock>, + pub sampling_columns: RwLock>, /// Network-related configuration. Immutable after initialization. pub config: Arc, /// Ethereum chain configuration. Immutable after initialization. @@ -87,6 +85,13 @@ impl NetworkGlobals { sampling_columns.extend(columns); } + tracing::debug!( + cgc = custody_group_count, + ?sampling_columns, + ?sampling_subnets, + "Starting node with custody params" + ); + NetworkGlobals { local_enr: RwLock::new(enr.clone()), peer_id: RwLock::new(enr.peer_id()), @@ -96,14 +101,40 @@ impl NetworkGlobals { gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::Paused), - sampling_subnets, - sampling_columns, - custody_group_count, + sampling_subnets: RwLock::new(sampling_subnets), + sampling_columns: RwLock::new(sampling_columns), config, spec, } } + /// Update the sampling subnets based on an updated cgc. + pub fn update_data_column_subnets(&self, custody_group_count: u64) { + // The below `expect` calls will panic on start up if the chain spec config values used + // are invalid + let sampling_size = self + .spec + .sampling_size(custody_group_count) + .expect("should compute node sampling size from valid chain spec"); + let custody_groups = + get_custody_groups(self.local_enr().node_id().raw(), sampling_size, &self.spec) + .expect("should compute node custody groups"); + + let mut sampling_subnets = self.sampling_subnets.write(); + for custody_index in &custody_groups { + let subnets = compute_subnets_from_custody_group(*custody_index, &self.spec) + .expect("should compute custody subnets for node"); + sampling_subnets.extend(subnets); + } + + let mut sampling_columns = self.sampling_columns.write(); + for custody_index in &custody_groups { + let columns = compute_columns_for_custody_group(*custody_index, &self.spec) + .expect("should compute custody columns for node"); + sampling_columns.extend(columns); + } + } + /// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect /// to. pub fn local_enr(&self) -> Enr { @@ -120,19 +151,6 @@ impl NetworkGlobals { self.listen_multiaddrs.read().clone() } - /// Returns true if this node is configured as a PeerDAS supernode - pub fn is_supernode(&self) -> bool { - self.custody_group_count == self.spec.number_of_custody_groups - } - - /// Returns the count of custody columns this node must sample for block import - pub fn custody_columns_count(&self) -> u64 { - // This only panics if the chain spec contains invalid values - self.spec - .sampling_size(self.custody_group_count) - .expect("should compute node sampling size from valid chain spec") - } - /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { self.peers.read().connected_peer_ids().count() @@ -226,10 +244,18 @@ impl NetworkGlobals { enable_light_client_server: self.config.enable_light_client_server, subscribe_all_subnets: self.config.subscribe_all_subnets, subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, - sampling_subnets: &self.sampling_subnets, + sampling_subnets: self.sampling_subnets.read().clone(), } } + pub fn sampling_columns(&self) -> HashSet { + self.sampling_columns.read().clone() + } + + pub fn sampling_subnets(&self) -> HashSet { + self.sampling_subnets.read().clone() + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals( trusted_peers: Vec, @@ -283,7 +309,7 @@ mod test { Arc::new(spec), ); assert_eq!( - globals.sampling_subnets.len(), + globals.sampling_subnets.read().len(), subnet_sampling_size as usize ); } @@ -306,7 +332,7 @@ mod test { Arc::new(spec), ); assert_eq!( - globals.sampling_columns.len(), + globals.sampling_columns.read().len(), subnet_sampling_size as usize ); } diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 56b97303d3..349bfe66a3 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -26,11 +26,11 @@ pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; #[derive(Debug)] -pub struct TopicConfig<'a> { +pub struct TopicConfig { pub enable_light_client_server: bool, pub subscribe_all_subnets: bool, pub subscribe_all_data_column_subnets: bool, - pub sampling_subnets: &'a HashSet, + pub sampling_subnets: HashSet, } /// Returns all the topics the node should subscribe at `fork_name` @@ -85,7 +85,7 @@ pub fn core_topics_to_subscribe( topics.push(GossipKind::DataColumnSidecar(i.into())); } } else { - for subnet in opts.sampling_subnets { + for subnet in &opts.sampling_subnets { topics.push(GossipKind::DataColumnSidecar(*subnet)); } } @@ -126,7 +126,7 @@ pub fn all_topics_at_fork(fork: ForkName, spec: &ChainSpec) -> Vec(fork, &opts, spec) } @@ -521,7 +521,7 @@ mod tests { enable_light_client_server: false, subscribe_all_subnets: false, subscribe_all_data_column_subnets: false, - sampling_subnets, + sampling_subnets: sampling_subnets.clone(), } } diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index d979ef9265..0dac126909 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -118,6 +118,7 @@ pub async fn build_libp2p_instance( let (signal, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx, service_name); + let custody_group_count = chain_spec.custody_requirement; let libp2p_context = lighthouse_network::Context { config, enr_fork_id: EnrForkId::default(), @@ -126,7 +127,7 @@ pub async fn build_libp2p_instance( libp2p_registry: None, }; Libp2pInstance( - LibP2PService::new(executor, libp2p_context) + LibP2PService::new(executor, libp2p_context, custody_group_count) .await .expect("should build libp2p instance") .0, diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index b129b54841..05c7dc287b 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -780,7 +780,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) let all_column_subnets = (0..network_globals.spec.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new); - let custody_column_subnets = network_globals.sampling_subnets.iter(); + let custody_column_subnets = network_globals.sampling_subnets(); // Iterate all subnet values to set to zero the empty entries in peers_per_column_subnet for subnet in all_column_subnets { @@ -794,7 +794,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) // Registering this metric is a duplicate for supernodes but helpful for fullnodes. This way // operators can monitor the health of only the subnets of their interest without complex // Grafana queries. - for subnet in custody_column_subnets { + for subnet in custody_column_subnets.iter() { set_gauge_entry( &PEERS_PER_CUSTODY_COLUMN_SUBNET, &[&format!("{subnet}")], diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 8757ab4383..87f657f935 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1272,10 +1272,7 @@ impl NetworkBeaconProcessor { let verification_result = self .chain .clone() - .verify_block_for_gossip( - block.clone(), - self.network_globals.custody_columns_count() as usize, - ) + .verify_block_for_gossip(block.clone()) .await; if verification_result.is_ok() { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f9390a2c7b..df9b656051 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -843,7 +843,7 @@ impl NetworkBeaconProcessor { block_root: Hash256, publish_blobs: bool, ) { - let custody_columns = self.network_globals.sampling_columns.clone(); + let custody_columns = self.network_globals.sampling_columns(); let self_cloned = self.clone(); let publish_fn = move |blobs_or_data_column| { if publish_blobs { @@ -930,7 +930,12 @@ impl NetworkBeaconProcessor { publish_columns: bool, ) -> Option { // Only supernodes attempt reconstruction - if !self.network_globals.is_supernode() { + if !self + .chain + .data_availability_checker + .custody_context() + .current_is_supernode + { return None; } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index cb9c976404..f6a1069a7f 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -285,7 +285,7 @@ impl TestRig { ) .unwrap() .into_iter() - .filter(|c| network_globals.sampling_columns.contains(&c.index)) + .filter(|c| network_globals.sampling_columns().contains(&c.index)) .collect::>(); (None, Some(custody_columns)) @@ -371,22 +371,12 @@ impl TestRig { } } - pub fn custody_columns_count(&self) -> usize { - self.network_beacon_processor - .network_globals - .custody_columns_count() as usize - } - pub fn enqueue_rpc_block(&self) { let block_root = self.next_block.canonical_root(); self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs( - Some(block_root), - self.next_block.clone(), - self.custody_columns_count(), - ), + RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 0 }, ) @@ -398,11 +388,7 @@ impl TestRig { self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs( - Some(block_root), - self.next_block.clone(), - self.custody_columns_count(), - ), + RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 1 }, ) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 77204b455d..c9f89ad668 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -10,6 +10,7 @@ use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconPro use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; + use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::RequestType; use lighthouse_network::service::Network; @@ -105,6 +106,12 @@ pub enum NetworkMessage { ConnectTrustedPeer(Enr), /// Disconnect from a trusted peer and remove it from the `trusted_peers` mapping. DisconnectTrustedPeer(Enr), + /// Custody group count changed due to a change in validators' weight. + /// Subscribe to new subnets and update ENR metadata. + CustodyCountChanged { + new_custody_group_count: u64, + sampling_count: u64, + }, } /// Messages triggered by validators that may trigger a subscription to a subnet. @@ -270,7 +277,15 @@ impl NetworkService { }; // launch libp2p service - let (mut libp2p, network_globals) = Network::new(executor.clone(), service_context).await?; + let (mut libp2p, network_globals) = Network::new( + executor.clone(), + service_context, + beacon_chain + .data_availability_checker + .custody_context() + .custody_group_count_at_head(&beacon_chain.spec), + ) + .await?; // Repopulate the DHT with stored ENR's if discovery is not disabled. if !config.disable_discovery { @@ -745,6 +760,15 @@ impl NetworkService { ); } } + NetworkMessage::CustodyCountChanged { + new_custody_group_count, + sampling_count, + } => { + // subscribe to `sampling_count` subnets + self.libp2p + .subscribe_new_data_column_subnets(sampling_count); + self.libp2p.update_enr_cgc(new_custody_group_count); + } } } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 99428b0c80..0418ab4553 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -257,17 +257,11 @@ impl RangeBlockComponentsRequest { )); } - RpcBlock::new_with_custody_columns( - Some(block_root), - block, - custody_columns, - expects_custody_columns.len(), - spec, - ) - .map_err(|e| format!("{e:?}"))? + RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) + .map_err(|e| format!("{e:?}"))? } else { // Block has no data, expects zero columns - RpcBlock::new_without_blobs(Some(block_root), block, 0) + RpcBlock::new_without_blobs(Some(block_root), block) }); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 58641f8606..f6be39fa4a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -476,7 +476,7 @@ impl SyncNetworkContext { // Attempt to find all required custody peers before sending any request or creating an ID let columns_by_range_peers_to_request = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let column_indexes = self.network_globals().sampling_columns.clone(); + let column_indexes = self.network_globals().sampling_columns(); Some(self.select_columns_by_range_peers_to_request( &column_indexes, peers, @@ -534,7 +534,7 @@ impl SyncNetworkContext { ( data_column_requests, self.network_globals() - .sampling_columns + .sampling_columns() .clone() .iter() .copied() @@ -928,8 +928,7 @@ impl SyncNetworkContext { // Include only the blob indexes not yet imported (received through gossip) let custody_indexes_to_fetch = self .network_globals() - .sampling_columns - .clone() + .sampling_columns() .into_iter() .filter(|index| !custody_indexes_imported.contains(index)) .collect::>(); @@ -1487,11 +1486,7 @@ impl SyncNetworkContext { .beacon_processor_if_enabled() .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; - let block = RpcBlock::new_without_blobs( - Some(block_root), - block, - self.network_globals().custody_columns_count() as usize, - ); + let block = RpcBlock::new_without_blobs(Some(block_root), block); debug!(block = ?block_root, id, "Sending block for processing"); // Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index be01734417..6100d322b8 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1032,7 +1032,7 @@ impl SyncingChain { // Require peers on all sampling column subnets before sending batches let peers_on_all_custody_subnets = network .network_globals() - .sampling_subnets + .sampling_subnets() .iter() .all(|subnet_id| { let peer_count = network diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index e7e6ff5970..a2c359c87e 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1205,12 +1205,8 @@ impl TestRig { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }; - let executed_block = AvailabilityPendingExecutedBlock::new( - block, - import_data, - payload_verification_outcome, - self.network_globals.custody_columns_count() as usize, - ); + let executed_block = + AvailabilityPendingExecutedBlock::new(block, import_data, payload_verification_outcome); match self .harness .chain diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 932f485dd0..c114eca555 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -449,18 +449,10 @@ fn build_rpc_block( RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns( - None, - block, - columns.clone(), - // TODO(das): Assumes CGC = max value. Change if we want to do more complex tests - columns.len(), - spec, - ) - .unwrap() + RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() } // Block has no data, expects zero columns - None => RpcBlock::new_without_blobs(None, block, 0), + None => RpcBlock::new_without_blobs(None, block), } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 5b30971fd8..eda57b7d8b 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -339,6 +339,8 @@ pub enum DBColumn { BeaconRandaoMixes, #[strum(serialize = "dht")] DhtEnrs, + #[strum(serialize = "cus")] + CustodyContext, /// DEPRECATED. For Optimistically Imported Merge Transition Blocks #[strum(serialize = "otb")] OptimisticTransitionBlock, @@ -397,6 +399,7 @@ impl DBColumn { | Self::PubkeyCache | Self::BeaconRestorePoint | Self::DhtEnrs + | Self::CustodyContext | Self::OptimisticTransitionBlock => 32, Self::BeaconBlockRoots | Self::BeaconBlockRootsChunked diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 59472e2edc..b4fd5afe87 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -203,6 +203,8 @@ pub struct ChainSpec { pub data_column_sidecar_subnet_count: u64, pub samples_per_slot: u64, pub custody_requirement: u64, + pub validator_custody_requirement: u64, + pub balance_per_additional_custody_group: u64, /* * Networking @@ -731,14 +733,6 @@ impl ChainSpec { Ok(std::cmp::max(custody_column_count, self.samples_per_slot)) } - pub fn custody_group_count(&self, is_supernode: bool) -> u64 { - if is_supernode { - self.number_of_custody_groups - } else { - self.custody_requirement - } - } - pub fn all_data_column_sidecar_subnets(&self) -> impl Iterator { (0..self.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new) } @@ -975,6 +969,8 @@ impl ChainSpec { data_column_sidecar_subnet_count: 128, number_of_columns: 128, samples_per_slot: 8, + validator_custody_requirement: 8, + balance_per_additional_custody_group: 32000000000, /* * Network specific @@ -1309,6 +1305,8 @@ impl ChainSpec { data_column_sidecar_subnet_count: 128, number_of_columns: 128, samples_per_slot: 8, + validator_custody_requirement: 8, + balance_per_additional_custody_group: 32000000000, /* * Network specific @@ -1650,6 +1648,12 @@ pub struct Config { #[serde(default = "BlobSchedule::default")] #[serde(skip_serializing_if = "BlobSchedule::is_empty")] blob_schedule: BlobSchedule, + #[serde(default = "default_validator_custody_requirement")] + #[serde(with = "serde_utils::quoted_u64")] + validator_custody_requirement: u64, + #[serde(default = "default_balance_per_additional_custody_group")] + #[serde(with = "serde_utils::quoted_u64")] + balance_per_additional_custody_group: u64, } fn default_bellatrix_fork_version() -> [u8; 4] { @@ -1815,6 +1819,14 @@ const fn default_samples_per_slot() -> u64 { 8 } +const fn default_validator_custody_requirement() -> u64 { + 8 +} + +const fn default_balance_per_additional_custody_group() -> u64 { + 32000000000 +} + fn max_blocks_by_root_request_common(max_request_blocks: u64) -> usize { let max_request_blocks = max_request_blocks as usize; RuntimeVariableList::::from_vec( @@ -2024,6 +2036,8 @@ impl Config { samples_per_slot: spec.samples_per_slot, custody_requirement: spec.custody_requirement, blob_schedule: spec.blob_schedule.clone(), + validator_custody_requirement: spec.validator_custody_requirement, + balance_per_additional_custody_group: spec.balance_per_additional_custody_group, } } @@ -2103,6 +2117,8 @@ impl Config { samples_per_slot, custody_requirement, ref blob_schedule, + validator_custody_requirement, + balance_per_additional_custody_group, } = self; if preset_base != E::spec_name().to_string().as_str() { @@ -2187,6 +2203,8 @@ impl Config { samples_per_slot, custody_requirement, blob_schedule: blob_schedule.clone(), + validator_custody_requirement, + balance_per_additional_custody_group, ..chain_spec.clone() }) diff --git a/scripts/local_testnet/network_params_das.yaml b/scripts/local_testnet/network_params_das.yaml index b16be34b89..c896b11330 100644 --- a/scripts/local_testnet/network_params_das.yaml +++ b/scripts/local_testnet/network_params_das.yaml @@ -2,7 +2,7 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 cl_extra_params: - --subscribe-all-data-column-subnets - --subscribe-all-subnets @@ -13,7 +13,7 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 cl_extra_params: # Note: useful for testing range sync (only produce block if the node is in sync to prevent forking) - --sync-tolerance-epochs=0 diff --git a/scripts/tests/checkpoint-sync-config-devnet.yaml b/scripts/tests/checkpoint-sync-config-devnet.yaml index c536d26b3b..de3020a884 100644 --- a/scripts/tests/checkpoint-sync-config-devnet.yaml +++ b/scripts/tests/checkpoint-sync-config-devnet.yaml @@ -3,18 +3,18 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: true - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: false checkpoint_sync_enabled: true -checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-0.ethpandaops.io" +checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-1.ethpandaops.io" global_log_level: debug network_params: - network: fusaka-devnet-0 + network: fusaka-devnet-1 diff --git a/scripts/tests/genesis-sync-config-fulu.yaml b/scripts/tests/genesis-sync-config-fulu.yaml index ccdc09c0d3..91aa4d1ffd 100644 --- a/scripts/tests/genesis-sync-config-fulu.yaml +++ b/scripts/tests/genesis-sync-config-fulu.yaml @@ -2,19 +2,26 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local + el_type: geth + el_image: ethpandaops/geth:fusaka-devnet-1 count: 2 # nodes without validators, used for testing sync. - cl_type: lighthouse cl_image: lighthouse:local + el_type: geth + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: true validator_count: 0 - cl_type: lighthouse cl_image: lighthouse:local + el_type: geth + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: false validator_count: 0 network_params: seconds_per_slot: 6 - fulu_fork_epoch: 0 + electra_fork_epoch: 0 + fulu_fork_epoch: 1 preset: "minimal" additional_services: - tx_fuzz diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index b507383190..af3b0bce2d 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -520,7 +520,7 @@ impl Tester { let result: Result, _> = self .block_on_dangerous(self.harness.chain.process_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone(), 0), + RpcBlock::new_without_blobs(Some(block_root), block.clone()), NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()),