diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 04e8a534da..de4fd29409 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -33,6 +33,7 @@ jobs:
arch: [aarch64-unknown-linux-gnu,
x86_64-unknown-linux-gnu,
x86_64-apple-darwin,
+ aarch64-apple-darwin,
x86_64-windows]
include:
- arch: aarch64-unknown-linux-gnu
@@ -44,6 +45,9 @@ jobs:
- arch: x86_64-apple-darwin
runner: macos-13
profile: maxperf
+ - arch: aarch64-apple-darwin
+ runner: macos-14
+ profile: maxperf
- arch: x86_64-windows
runner: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "windows", "release"]') || 'windows-2019' }}
profile: maxperf
@@ -94,6 +98,10 @@ jobs:
if: matrix.arch == 'x86_64-apple-darwin'
run: cargo install --path lighthouse --force --locked --features portable,gnosis --profile ${{ matrix.profile }}
+ - name: Build Lighthouse for aarch64-apple-darwin
+ if: matrix.arch == 'aarch64-apple-darwin'
+ run: cargo install --path lighthouse --force --locked --features portable,gnosis --profile ${{ matrix.profile }}
+
- name: Build Lighthouse for Windows
if: matrix.arch == 'x86_64-windows'
run: cargo install --path lighthouse --force --locked --features portable,gnosis --profile ${{ matrix.profile }}
@@ -237,6 +245,7 @@ jobs:
| System | Architecture | Binary | PGP Signature |
|:---:|:---:|:---:|:---|
|
| x86_64 | [lighthouse-${{ env.VERSION }}-x86_64-apple-darwin.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-apple-darwin.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-apple-darwin.tar.gz.asc) |
+ |
| aarch64 | [lighthouse-${{ env.VERSION }}-aarch64-apple-darwin.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-apple-darwin.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-apple-darwin.tar.gz.asc) |
|
| x86_64 | [lighthouse-${{ env.VERSION }}-x86_64-unknown-linux-gnu.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-unknown-linux-gnu.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-unknown-linux-gnu.tar.gz.asc) |
|
| aarch64 | [lighthouse-${{ env.VERSION }}-aarch64-unknown-linux-gnu.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-unknown-linux-gnu.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-unknown-linux-gnu.tar.gz.asc) |
|
| x86_64 | [lighthouse-${{ env.VERSION }}-x86_64-windows.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-windows.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-windows.tar.gz.asc) |
diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml
index 817fd9524d..64a93ab5ae 100644
--- a/.github/workflows/test-suite.yml
+++ b/.github/workflows/test-suite.yml
@@ -295,7 +295,7 @@ jobs:
with:
channel: stable
cache-target: release
- - name: Run a basic beacon chain sim that starts from Bellatrix
+ - name: Run a basic beacon chain sim that starts from Deneb
run: cargo run --release --bin simulator basic-sim
fallback-simulator-ubuntu:
name: fallback-simulator-ubuntu
diff --git a/Cargo.lock b/Cargo.lock
index dd0c367692..ff87c32783 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -813,6 +813,7 @@ dependencies = [
"maplit",
"merkle_proof",
"metrics",
+ "once_cell",
"oneshot_broadcast",
"operation_pool",
"parking_lot 0.12.3",
@@ -878,14 +879,13 @@ name = "beacon_node_fallback"
version = "0.1.0"
dependencies = [
"clap",
- "environment",
"eth2",
"futures",
"itertools 0.10.5",
- "logging",
"serde",
"slot_clock",
"strum",
+ "task_executor",
"tokio",
"tracing",
"types",
@@ -2363,6 +2363,7 @@ dependencies = [
"tokio",
"tracing",
"types",
+ "validator_store",
]
[[package]]
@@ -5634,6 +5635,32 @@ dependencies = [
"unused_port",
]
+[[package]]
+name = "lighthouse_validator_store"
+version = "0.1.0"
+dependencies = [
+ "account_utils",
+ "beacon_node_fallback",
+ "doppelganger_service",
+ "either",
+ "environment",
+ "eth2",
+ "futures",
+ "initialized_validators",
+ "logging",
+ "parking_lot 0.12.3",
+ "serde",
+ "signing_method",
+ "slashing_protection",
+ "slot_clock",
+ "task_executor",
+ "tokio",
+ "tracing",
+ "types",
+ "validator_metrics",
+ "validator_store",
+]
+
[[package]]
name = "lighthouse_version"
version = "0.1.0"
@@ -5731,8 +5758,6 @@ dependencies = [
"chrono",
"logroller",
"metrics",
- "once_cell",
- "parking_lot 0.12.3",
"serde",
"serde_json",
"tokio",
@@ -9668,6 +9693,7 @@ dependencies = [
"graffiti_file",
"hyper 1.6.0",
"initialized_validators",
+ "lighthouse_validator_store",
"metrics",
"monitoring_api",
"parking_lot 0.12.3",
@@ -9723,6 +9749,7 @@ dependencies = [
"health_metrics",
"initialized_validators",
"itertools 0.10.5",
+ "lighthouse_validator_store",
"lighthouse_version",
"logging",
"parking_lot 0.12.3",
@@ -9755,6 +9782,7 @@ name = "validator_http_metrics"
version = "0.1.0"
dependencies = [
"health_metrics",
+ "lighthouse_validator_store",
"lighthouse_version",
"logging",
"malloc_utils",
@@ -9766,7 +9794,6 @@ dependencies = [
"types",
"validator_metrics",
"validator_services",
- "validator_store",
"warp",
"warp_utils",
]
@@ -9809,9 +9836,7 @@ version = "0.1.0"
dependencies = [
"beacon_node_fallback",
"bls",
- "doppelganger_service",
"either",
- "environment",
"eth2",
"futures",
"graffiti_file",
@@ -9832,19 +9857,8 @@ dependencies = [
name = "validator_store"
version = "0.1.0"
dependencies = [
- "account_utils",
- "doppelganger_service",
- "initialized_validators",
- "logging",
- "parking_lot 0.12.3",
- "serde",
- "signing_method",
"slashing_protection",
- "slot_clock",
- "task_executor",
- "tracing",
"types",
- "validator_metrics",
]
[[package]]
@@ -10102,6 +10116,7 @@ dependencies = [
"eth2_network_config",
"futures",
"initialized_validators",
+ "lighthouse_validator_store",
"logging",
"parking_lot 0.12.3",
"reqwest",
diff --git a/Cargo.toml b/Cargo.toml
index 31f50068dc..075552b281 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -96,11 +96,11 @@ members = [
"validator_client/http_api",
"validator_client/http_metrics",
"validator_client/initialized_validators",
+ "validator_client/lighthouse_validator_store",
"validator_client/signing_method",
"validator_client/slashing_protection",
"validator_client/validator_metrics",
"validator_client/validator_services",
- "validator_client/validator_store",
"validator_manager",
]
@@ -161,6 +161,7 @@ maplit = "1"
milhouse = "0.5"
mockito = "1.5.0"
num_cpus = "1"
+once_cell = "1.17.1"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", default-features = false }
@@ -227,7 +228,6 @@ compare_fields = { path = "common/compare_fields" }
deposit_contract = { path = "common/deposit_contract" }
directory = { path = "common/directory" }
doppelganger_service = { path = "validator_client/doppelganger_service" }
-validator_services = { path = "validator_client/validator_services" }
environment = { path = "lighthouse/environment" }
eth1 = { path = "beacon_node/eth1" }
eth1_test_rig = { path = "testing/eth1_test_rig" }
@@ -249,6 +249,7 @@ int_to_bytes = { path = "consensus/int_to_bytes" }
kzg = { path = "crypto/kzg" }
metrics = { path = "common/metrics" }
lighthouse_network = { path = "beacon_node/lighthouse_network" }
+lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" }
lighthouse_version = { path = "common/lighthouse_version" }
workspace_members = { path = "common/workspace_members" }
lockfile = { path = "common/lockfile" }
@@ -280,6 +281,7 @@ validator_dir = { path = "common/validator_dir" }
validator_http_api = { path = "validator_client/http_api" }
validator_http_metrics = { path = "validator_client/http_metrics" }
validator_metrics = { path = "validator_client/validator_metrics" }
+validator_services = { path = "validator_client/validator_services" }
validator_store = { path = "validator_client/validator_store" }
validator_test_rig = { path = "testing/validator_test_rig" }
warp_utils = { path = "common/warp_utils" }
diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml
index 0cf9ae1a10..18b40cab7e 100644
--- a/beacon_node/beacon_chain/Cargo.toml
+++ b/beacon_node/beacon_chain/Cargo.toml
@@ -47,6 +47,7 @@ logging = { workspace = true }
lru = { workspace = true }
merkle_proof = { workspace = true }
metrics = { workspace = true }
+once_cell = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
operation_pool = { workspace = true }
parking_lot = { workspace = true }
diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs
index 567433caee..56b13b0b77 100644
--- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs
+++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs
@@ -11,10 +11,12 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use fork_choice::ExecutionStatus;
use lru::LruCache;
+use once_cell::sync::OnceCell;
use smallvec::SmallVec;
use state_processing::state_advance::partial_state_advance;
use std::cmp::Ordering;
use std::num::NonZeroUsize;
+use std::sync::Arc;
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
@@ -39,21 +41,21 @@ pub struct Proposer {
/// their signatures.
pub struct EpochBlockProposers {
/// The epoch to which the proposers pertain.
- epoch: Epoch,
+ pub(crate) epoch: Epoch,
/// The fork that should be used to verify proposer signatures.
- fork: Fork,
+ pub(crate) fork: Fork,
/// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot
/// in that epoch.
///
/// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`.
- proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
+ pub(crate) proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
}
/// A cache to store the proposers for some epoch.
///
/// See the module-level documentation for more information.
pub struct BeaconProposerCache {
- cache: LruCache<(Epoch, Hash256), EpochBlockProposers>,
+ cache: LruCache<(Epoch, Hash256), Arc>>,
}
impl Default for BeaconProposerCache {
@@ -74,7 +76,8 @@ impl BeaconProposerCache {
) -> Option {
let epoch = slot.epoch(E::slots_per_epoch());
let key = (epoch, shuffling_decision_block);
- if let Some(cache) = self.cache.get(&key) {
+ let cache_opt = self.cache.get(&key).and_then(|cell| cell.get());
+ if let Some(cache) = cache_opt {
// This `if` statement is likely unnecessary, but it feels like good practice.
if epoch == cache.epoch {
cache
@@ -103,7 +106,26 @@ impl BeaconProposerCache {
epoch: Epoch,
) -> Option<&SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>> {
let key = (epoch, shuffling_decision_block);
- self.cache.get(&key).map(|cache| &cache.proposers)
+ self.cache
+ .get(&key)
+ .and_then(|cache_once_cell| cache_once_cell.get().map(|proposers| &proposers.proposers))
+ }
+
+ /// Returns the `OnceCell` for the given `(epoch, shuffling_decision_block)` key,
+ /// inserting an empty one if it doesn't exist.
+ ///
+ /// The returned `OnceCell` allows the caller to initialise the value externally
+ /// using `get_or_try_init`, enabling deferred computation without holding a mutable
+ /// reference to the cache.
+ pub fn get_or_insert_key(
+ &mut self,
+ epoch: Epoch,
+ shuffling_decision_block: Hash256,
+ ) -> Arc> {
+ let key = (epoch, shuffling_decision_block);
+ self.cache
+ .get_or_insert(key, || Arc::new(OnceCell::new()))
+ .clone()
}
/// Insert the proposers into the cache.
@@ -120,14 +142,13 @@ impl BeaconProposerCache {
) -> Result<(), BeaconStateError> {
let key = (epoch, shuffling_decision_block);
if !self.cache.contains(&key) {
- self.cache.put(
- key,
- EpochBlockProposers {
- epoch,
- fork,
- proposers: proposers.into(),
- },
- );
+ let epoch_proposers = EpochBlockProposers {
+ epoch,
+ fork,
+ proposers: proposers.into(),
+ };
+ self.cache
+ .put(key, Arc::new(OnceCell::with_value(epoch_proposers)));
}
Ok(())
diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs
index f5fd24483a..5b5a6fcc0d 100644
--- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs
+++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs
@@ -331,7 +331,7 @@ impl PendingComponents {
format!(
"block {} blobs {}/{}",
block_count,
- self.verified_blobs.len(),
+ self.verified_blobs.iter().flatten().count(),
num_expected_blobs
)
}
diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs
index 57efbb0a77..20b5c9aa02 100644
--- a/beacon_node/beacon_chain/src/data_column_verification.rs
+++ b/beacon_node/beacon_chain/src/data_column_verification.rs
@@ -1,3 +1,4 @@
+use crate::beacon_proposer_cache::EpochBlockProposers;
use crate::block_verification::{
cheap_state_advance_to_obtain_committees, get_validator_pubkey_cache, process_block_slash_info,
BlockSlashInfo,
@@ -9,7 +10,6 @@ use derivative::Derivative;
use fork_choice::ProtoBlock;
use kzg::{Error as KzgError, Kzg};
use proto_array::Block;
-use slasher::test_utils::E;
use slot_clock::SlotClock;
use ssz_derive::{Decode, Encode};
use std::iter;
@@ -588,28 +588,33 @@ fn verify_proposer_and_signature(
chain: &BeaconChain,
) -> Result<(), GossipDataColumnError> {
let column_slot = data_column.slot();
- let column_epoch = column_slot.epoch(E::slots_per_epoch());
+ let slots_per_epoch = T::EthSpec::slots_per_epoch();
+ let column_epoch = column_slot.epoch(slots_per_epoch);
let column_index = data_column.index;
let block_root = data_column.block_root();
let block_parent_root = data_column.block_parent_root();
- let proposer_shuffling_root =
- if parent_block.slot.epoch(T::EthSpec::slots_per_epoch()) == column_epoch {
- parent_block
- .next_epoch_shuffling_id
- .shuffling_decision_block
- } else {
- parent_block.root
- };
+ let proposer_shuffling_root = if parent_block.slot.epoch(slots_per_epoch) == column_epoch {
+ parent_block
+ .next_epoch_shuffling_id
+ .shuffling_decision_block
+ } else {
+ parent_block.root
+ };
- let proposer_opt = chain
+ // We lock the cache briefly to get or insert a OnceCell, then drop the lock
+ // before doing proposer shuffling calculation via `OnceCell::get_or_try_init`. This avoids
+ // holding the lock during the computation, while still ensuring the result is cached and
+ // initialised only once.
+ //
+ // This approach exposes the cache internals (`OnceCell` & `EpochBlockProposers`)
+ // as a trade-off for avoiding lock contention.
+ let epoch_proposers_cell = chain
.beacon_proposer_cache
.lock()
- .get_slot::(proposer_shuffling_root, column_slot);
+ .get_or_insert_key(column_epoch, proposer_shuffling_root);
- let (proposer_index, fork) = if let Some(proposer) = proposer_opt {
- (proposer.index, proposer.fork)
- } else {
+ let epoch_proposers = epoch_proposers_cell.get_or_try_init(move || {
debug!(
%block_root,
index = %column_index,
@@ -633,19 +638,20 @@ fn verify_proposer_and_signature(
)?;
let proposers = state.get_beacon_proposer_indices(&chain.spec)?;
- let proposer_index = *proposers
- .get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize)
- .ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;
-
// Prime the proposer shuffling cache with the newly-learned value.
- chain.beacon_proposer_cache.lock().insert(
- column_epoch,
- proposer_shuffling_root,
- proposers,
- state.fork(),
- )?;
- (proposer_index, state.fork())
- };
+ Ok::<_, GossipDataColumnError>(EpochBlockProposers {
+ epoch: column_epoch,
+ fork: state.fork(),
+ proposers: proposers.into(),
+ })
+ })?;
+
+ let proposer_index = *epoch_proposers
+ .proposers
+ .get(column_slot.as_usize() % slots_per_epoch as usize)
+ .ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;
+
+ let fork = epoch_proposers.fork;
// Signature verify the signed block header.
let signature_is_valid = {
diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs
index eaaa23130d..704fb3663f 100644
--- a/beacon_node/beacon_chain/src/kzg_utils.rs
+++ b/beacon_node/beacon_chain/src/kzg_utils.rs
@@ -8,9 +8,9 @@ use std::sync::Arc;
use types::beacon_block_body::KzgCommitments;
use types::data_column_sidecar::{Cell, DataColumn, DataColumnSidecarError};
use types::{
- Blob, BlobSidecar, BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecar,
- DataColumnSidecarList, EthSpec, Hash256, KzgCommitment, KzgProof, SignedBeaconBlock,
- SignedBeaconBlockHeader, SignedBlindedBeaconBlock,
+ Blob, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList,
+ EthSpec, Hash256, KzgCommitment, KzgProof, SignedBeaconBlock, SignedBeaconBlockHeader,
+ SignedBlindedBeaconBlock,
};
/// Converts a blob ssz List object to an array to be used with the kzg
@@ -79,38 +79,27 @@ pub fn validate_data_columns<'a, E: EthSpec, I>(
where
I: Iterator- >> + Clone,
{
- let cells = data_column_iter
- .clone()
- .flat_map(|data_column| data_column.column.iter().map(ssz_cell_to_crypto_cell::))
- .collect::, KzgError>>()?;
+ let mut cells = Vec::new();
+ let mut proofs = Vec::new();
+ let mut column_indices = Vec::new();
+ let mut commitments = Vec::new();
- let proofs = data_column_iter
- .clone()
- .flat_map(|data_column| {
- data_column
- .kzg_proofs
- .iter()
- .map(|&proof| Bytes48::from(proof))
- })
- .collect::>();
+ for data_column in data_column_iter {
+ let col_index = data_column.index;
- let column_indices = data_column_iter
- .clone()
- .flat_map(|data_column| {
- let col_index = data_column.index;
- data_column.column.iter().map(move |_| col_index)
- })
- .collect::>();
+ for cell in &data_column.column {
+ cells.push(ssz_cell_to_crypto_cell::(cell)?);
+ column_indices.push(col_index);
+ }
- let commitments = data_column_iter
- .clone()
- .flat_map(|data_column| {
- data_column
- .kzg_commitments
- .iter()
- .map(|&commitment| Bytes48::from(commitment))
- })
- .collect::>();
+ for &proof in &data_column.kzg_proofs {
+ proofs.push(Bytes48::from(proof));
+ }
+
+ for &commitment in &data_column.kzg_commitments {
+ commitments.push(Bytes48::from(commitment));
+ }
+ }
kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments)
}
diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
index 083887046a..95a4e82fa2 100644
--- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
+++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
@@ -1,6 +1,8 @@
use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY;
use crate::discovery::{peer_id_to_node_id, CombinedKey};
-use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId};
+use crate::{
+ metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId, SyncInfo,
+};
use itertools::Itertools;
use logging::crit;
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
@@ -15,7 +17,7 @@ use std::{
use sync_status::SyncStatus;
use tracing::{debug, error, trace, warn};
use types::data_column_custody_group::compute_subnets_for_node;
-use types::{ChainSpec, DataColumnSubnetId, EthSpec};
+use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec, Hash256, Slot};
pub mod client;
pub mod peer_info;
@@ -735,6 +737,19 @@ impl PeerDB {
},
);
+ self.update_sync_status(
+ &peer_id,
+ SyncStatus::Synced {
+ // Fill in mock SyncInfo, only for the peer to return `is_synced() == true`.
+ info: SyncInfo {
+ head_slot: Slot::new(0),
+ head_root: Hash256::ZERO,
+ finalized_epoch: Epoch::new(0),
+ finalized_root: Hash256::ZERO,
+ },
+ },
+ );
+
if supernode {
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
let all_subnets = (0..spec.data_column_sidecar_subnet_count)
diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs
index 3031a0dff7..fd99d93589 100644
--- a/beacon_node/lighthouse_network/src/types/globals.rs
+++ b/beacon_node/lighthouse_network/src/types/globals.rs
@@ -206,6 +206,20 @@ impl NetworkGlobals {
.collect::>()
}
+ /// Returns true if the peer is known and is a custodian of `column_index`
+ pub fn is_custody_peer_of(&self, column_index: ColumnIndex, peer_id: &PeerId) -> bool {
+ self.peers
+ .read()
+ .peer_info(peer_id)
+ .map(|info| {
+ info.is_assigned_to_custody_subnet(&DataColumnSubnetId::from_column_index(
+ column_index,
+ &self.spec,
+ ))
+ })
+ .unwrap_or(false)
+ }
+
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
pub fn as_topic_config(&self) -> TopicConfig {
TopicConfig {
diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs
index d61ea58377..cf0e98cda8 100644
--- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs
+++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs
@@ -1160,7 +1160,8 @@ impl NetworkBeaconProcessor {
"Processed data column, waiting for other components"
);
- self.attempt_data_column_reconstruction(block_root).await;
+ self.attempt_data_column_reconstruction(block_root, true)
+ .await;
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs
index 9a8edbfa4c..ba681eed14 100644
--- a/beacon_node/network/src/network_beacon_processor/mod.rs
+++ b/beacon_node/network/src/network_beacon_processor/mod.rs
@@ -918,9 +918,13 @@ impl NetworkBeaconProcessor {
///
/// Returns `Some(AvailabilityProcessingStatus)` if reconstruction is successfully performed,
/// otherwise returns `None`.
+ ///
+ /// The `publish_columns` parameter controls whether reconstructed columns should be published
+ /// to the gossip network.
async fn attempt_data_column_reconstruction(
self: &Arc,
block_root: Hash256,
+ publish_columns: bool,
) -> Option {
// Only supernodes attempt reconstruction
if !self.network_globals.is_supernode() {
@@ -930,7 +934,9 @@ impl NetworkBeaconProcessor {
let result = self.chain.reconstruct_data_columns(block_root).await;
match result {
Ok(Some((availability_processing_status, data_columns_to_publish))) => {
- self.publish_data_columns_gradually(data_columns_to_publish, block_root);
+ if publish_columns {
+ self.publish_data_columns_gradually(data_columns_to_publish, block_root);
+ }
match &availability_processing_status {
AvailabilityProcessingStatus::Imported(hash) => {
debug!(
@@ -1141,7 +1147,7 @@ use {
};
#[cfg(test)]
-type TestBeaconChainType =
+pub(crate) type TestBeaconChainType =
Witness, E, MemoryStore, MemoryStore>;
#[cfg(test)]
diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs
index 48ae26c826..31b17a41a4 100644
--- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs
+++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs
@@ -383,8 +383,12 @@ impl NetworkBeaconProcessor {
);
// Attempt reconstruction here before notifying sync, to avoid sending out more requests
// that we may no longer need.
- if let Some(availability) =
- self.attempt_data_column_reconstruction(block_root).await
+ // We don't publish columns reconstructed from rpc columns to the gossip network,
+ // as these are likely historic columns.
+ let publish_columns = false;
+ if let Some(availability) = self
+ .attempt_data_column_reconstruction(block_root, publish_columns)
+ .await
{
result = Ok(availability)
}
diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs
index 509caf7316..fcef06271f 100644
--- a/beacon_node/network/src/sync/backfill_sync/mod.rs
+++ b/beacon_node/network/src/sync/backfill_sync/mod.rs
@@ -10,7 +10,9 @@
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::manager::BatchProcessResult;
-use crate::sync::network_context::{RangeRequestId, RpcResponseError, SyncNetworkContext};
+use crate::sync::network_context::{
+ RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext,
+};
use crate::sync::range_sync::{
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
};
@@ -20,10 +22,9 @@ use lighthouse_network::service::api_types::Id;
use lighthouse_network::types::{BackFillState, NetworkGlobals};
use lighthouse_network::{PeerAction, PeerId};
use logging::crit;
-use rand::seq::SliceRandom;
use std::collections::{
btree_map::{BTreeMap, Entry},
- HashMap, HashSet,
+ HashSet,
};
use std::sync::Arc;
use tracing::{debug, error, info, instrument, warn};
@@ -121,9 +122,6 @@ pub struct BackFillSync {
/// Sorted map of batches undergoing some kind of processing.
batches: BTreeMap>,
- /// List of peers we are currently awaiting a response for.
- active_requests: HashMap>,
-
/// The current processing batch, if any.
current_processing_batch: Option,
@@ -176,7 +174,6 @@ impl BackFillSync {
let bfs = BackFillSync {
batches: BTreeMap::new(),
- active_requests: HashMap::new(),
processing_target: current_start,
current_start,
last_batch_downloaded: false,
@@ -314,45 +311,11 @@ impl BackFillSync {
skip_all
)]
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
- pub fn peer_disconnected(
- &mut self,
- peer_id: &PeerId,
- network: &mut SyncNetworkContext,
- ) -> Result<(), BackFillError> {
+ pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
if matches!(self.state(), BackFillState::Failed) {
return Ok(());
}
- if let Some(batch_ids) = self.active_requests.remove(peer_id) {
- // fail the batches.
- for id in batch_ids {
- if let Some(batch) = self.batches.get_mut(&id) {
- match batch.download_failed(false) {
- Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
- self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
- }
- Ok(BatchOperationOutcome::Continue) => {}
- Err(e) => {
- self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
- }
- }
- // If we have run out of peers in which to retry this batch, the backfill state
- // transitions to a paused state.
- // We still need to reset the state for all the affected batches, so we should not
- // short circuit early.
- if self.retry_batch_download(network, id).is_err() {
- debug!(
- batch_id = %id,
- error = "no synced peers",
- "Batch could not be retried"
- );
- }
- } else {
- debug!(peer = %peer_id, batch = %id, "Batch not found while removing peer");
- }
- }
- }
-
// Remove the peer from the participation list
self.participating_peers.remove(peer_id);
Ok(())
@@ -386,15 +349,12 @@ impl BackFillSync {
return Ok(());
}
debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed");
- if let Some(active_requests) = self.active_requests.get_mut(peer_id) {
- active_requests.remove(&batch_id);
- }
- match batch.download_failed(true) {
+ match batch.download_failed(Some(*peer_id)) {
Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)),
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))
}
- Ok(BatchOperationOutcome::Continue) => self.retry_batch_download(network, batch_id),
+ Ok(BatchOperationOutcome::Continue) => self.send_batch(network, batch_id),
}
} else {
// this could be an error for an old batch, removed when the chain advances
@@ -435,19 +395,11 @@ impl BackFillSync {
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
- // TODO(das): removed peer_id matching as the node may request a different peer for data
- // columns.
if !batch.is_expecting_block(&request_id) {
return Ok(ProcessResult::Successful);
}
- // A stream termination has been sent. This batch has ended. Process a completed batch.
- // Remove the request from the peer's active batches
- self.active_requests
- .get_mut(peer_id)
- .map(|active_requests| active_requests.remove(&batch_id));
-
- match batch.download_completed(blocks) {
+ match batch.download_completed(blocks, *peer_id) {
Ok(received) => {
let awaiting_batches =
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;
@@ -488,7 +440,6 @@ impl BackFillSync {
self.set_state(BackFillState::Failed);
// Remove all batches and active requests and participating peers.
self.batches.clear();
- self.active_requests.clear();
self.participating_peers.clear();
self.restart_failed_sync = false;
@@ -622,7 +573,7 @@ impl BackFillSync {
}
};
- let Some(peer) = batch.current_peer() else {
+ let Some(peer) = batch.processing_peer() else {
self.fail_sync(BackFillError::BatchInvalidState(
batch_id,
String::from("Peer does not exist"),
@@ -698,6 +649,8 @@ impl BackFillSync {
);
for peer in self.participating_peers.drain() {
+ // TODO(das): `participating_peers` only includes block peers. Should we
+ // penalize the custody column peers too?
network.report_peer(peer, *penalty, "backfill_batch_failed");
}
self.fail_sync(BackFillError::BatchProcessingFailed(batch_id))
@@ -723,7 +676,7 @@ impl BackFillSync {
{
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
}
- self.retry_batch_download(network, batch_id)?;
+ self.send_batch(network, batch_id)?;
Ok(ProcessResult::Successful)
}
}
@@ -864,12 +817,7 @@ impl BackFillSync {
}
}
}
- BatchState::Downloading(peer, ..) => {
- // remove this batch from the peer's active requests
- if let Some(active_requests) = self.active_requests.get_mut(peer) {
- active_requests.remove(&id);
- }
- }
+ BatchState::Downloading(..) => {}
BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => {
crit!("batch indicates inconsistent chain state while advancing chain")
}
@@ -951,57 +899,10 @@ impl BackFillSync {
self.processing_target = self.current_start;
for id in redownload_queue {
- self.retry_batch_download(network, id)?;
+ self.send_batch(network, id)?;
}
// finally, re-request the failed batch.
- self.retry_batch_download(network, batch_id)
- }
-
- /// Sends and registers the request of a batch awaiting download.
- #[instrument(parent = None,
- level = "info",
- fields(service = "backfill_sync"),
- name = "backfill_sync",
- skip_all
- )]
- fn retry_batch_download(
- &mut self,
- network: &mut SyncNetworkContext,
- batch_id: BatchId,
- ) -> Result<(), BackFillError> {
- let Some(batch) = self.batches.get_mut(&batch_id) else {
- return Ok(());
- };
-
- // Find a peer to request the batch
- let failed_peers = batch.failed_peers();
-
- let new_peer = self
- .network_globals
- .peers
- .read()
- .synced_peers()
- .map(|peer| {
- (
- failed_peers.contains(peer),
- self.active_requests.get(peer).map(|v| v.len()).unwrap_or(0),
- rand::random::(),
- *peer,
- )
- })
- // Sort peers prioritizing unrelated peers with less active requests.
- .min()
- .map(|(_, _, _, peer)| peer);
-
- if let Some(peer) = new_peer {
- self.participating_peers.insert(peer);
- self.send_batch(network, batch_id, peer)
- } else {
- // If we are here the chain has no more synced peers
- info!(reason = "insufficient_synced_peers", "Backfill sync paused");
- self.set_state(BackFillState::Paused);
- Err(BackFillError::Paused)
- }
+ self.send_batch(network, batch_id)
}
/// Requests the batch assigned to the given id from a given peer.
@@ -1015,53 +916,65 @@ impl BackFillSync {
&mut self,
network: &mut SyncNetworkContext,
batch_id: BatchId,
- peer: PeerId,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
+ let synced_peers = self
+ .network_globals
+ .peers
+ .read()
+ .synced_peers()
+ .cloned()
+ .collect::>();
+
let (request, is_blob_batch) = batch.to_blocks_by_range_request();
+ let failed_peers = batch.failed_peers();
match network.block_components_by_range_request(
- peer,
is_blob_batch,
request,
RangeRequestId::BackfillSync { batch_id },
+ &synced_peers,
+ &failed_peers,
) {
Ok(request_id) => {
// inform the batch about the new request
- if let Err(e) = batch.start_downloading_from_peer(peer, request_id) {
+ if let Err(e) = batch.start_downloading(request_id) {
return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0));
}
debug!(epoch = %batch_id, %batch, "Requesting batch");
- // register the batch for this peer
- self.active_requests
- .entry(peer)
- .or_default()
- .insert(batch_id);
return Ok(());
}
- Err(e) => {
- // NOTE: under normal conditions this shouldn't happen but we handle it anyway
- warn!(%batch_id, error = ?e, %batch,"Could not send batch request");
- // register the failed download and check if the batch can be retried
- if let Err(e) = batch.start_downloading_from_peer(peer, 1) {
- return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0));
+ Err(e) => match e {
+ RpcRequestSendError::NoPeer(no_peer) => {
+ // If we are here the chain has no more synced peers
+ info!(
+ "reason" = format!("insufficient_synced_peers({no_peer:?})"),
+ "Backfill sync paused"
+ );
+ self.set_state(BackFillState::Paused);
+ return Err(BackFillError::Paused);
}
- self.active_requests
- .get_mut(&peer)
- .map(|request| request.remove(&batch_id));
+ RpcRequestSendError::InternalError(e) => {
+ // NOTE: under normal conditions this shouldn't happen but we handle it anyway
+ warn!(%batch_id, error = ?e, %batch,"Could not send batch request");
+ // register the failed download and check if the batch can be retried
+ if let Err(e) = batch.start_downloading(1) {
+ return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0));
+ }
- match batch.download_failed(true) {
- Err(e) => {
- self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?
- }
- Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
- self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?
- }
- Ok(BatchOperationOutcome::Continue) => {
- return self.retry_batch_download(network, batch_id)
+ match batch.download_failed(None) {
+ Err(e) => {
+ self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?
+ }
+ Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
+ self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?
+ }
+ Ok(BatchOperationOutcome::Continue) => {
+ return self.send_batch(network, batch_id)
+ }
}
}
- }
+ },
}
}
@@ -1093,7 +1006,7 @@ impl BackFillSync {
.collect::>();
for batch_id in batch_ids_to_retry {
- self.retry_batch_download(network, batch_id)?;
+ self.send_batch(network, batch_id)?;
}
Ok(())
}
@@ -1115,34 +1028,16 @@ impl BackFillSync {
}
// find the next pending batch and request it from the peer
-
- // randomize the peers for load balancing
- let mut rng = rand::thread_rng();
- let mut idle_peers = self
- .network_globals
- .peers
- .read()
- .synced_peers()
- .filter(|peer_id| {
- self.active_requests
- .get(peer_id)
- .map(|requests| requests.is_empty())
- .unwrap_or(true)
- })
- .cloned()
- .collect::>();
-
- idle_peers.shuffle(&mut rng);
-
- while let Some(peer) = idle_peers.pop() {
- if let Some(batch_id) = self.include_next_batch(network) {
- // send the batch
- self.send_batch(network, batch_id, peer)?;
- } else {
- // No more batches, simply stop
- return Ok(());
- }
+ // Note: for this function to not infinite loop we must:
+ // - If `include_next_batch` returns Some we MUST increase the count of batches that are
+ // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of
+ // that function.
+ while let Some(batch_id) = self.include_next_batch(network) {
+ // send the batch
+ self.send_batch(network, batch_id)?;
}
+
+ // No more batches, simply stop
Ok(())
}
@@ -1296,3 +1191,73 @@ enum ResetEpochError {
/// The chain has already completed.
SyncCompleted,
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use beacon_chain::test_utils::BeaconChainHarness;
+ use bls::Hash256;
+ use lighthouse_network::{NetworkConfig, SyncInfo, SyncStatus};
+ use rand::prelude::StdRng;
+ use rand::SeedableRng;
+ use types::MinimalEthSpec;
+
+ #[test]
+ fn request_batches_should_not_loop_infinitely() {
+ let harness = BeaconChainHarness::builder(MinimalEthSpec)
+ .default_spec()
+ .deterministic_keypairs(4)
+ .fresh_ephemeral_store()
+ .build();
+
+ let beacon_chain = harness.chain.clone();
+ let slots_per_epoch = MinimalEthSpec::slots_per_epoch();
+
+ let network_globals = Arc::new(NetworkGlobals::new_test_globals(
+ vec![],
+ Arc::new(NetworkConfig::default()),
+ beacon_chain.spec.clone(),
+ ));
+
+ {
+ let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
+ let peer_id = network_globals
+ .peers
+ .write()
+ .__add_connected_peer_testing_only(
+ true,
+ &beacon_chain.spec,
+ k256::ecdsa::SigningKey::random(&mut rng).into(),
+ );
+
+ // Simulate finalized epoch and head being 2 epochs ahead
+ let finalized_epoch = Epoch::new(40);
+ let head_epoch = finalized_epoch + 2;
+ let head_slot = head_epoch.start_slot(slots_per_epoch) + 1;
+
+ network_globals.peers.write().update_sync_status(
+ &peer_id,
+ SyncStatus::Synced {
+ info: SyncInfo {
+ head_slot,
+ head_root: Hash256::random(),
+ finalized_epoch,
+ finalized_root: Hash256::random(),
+ },
+ },
+ );
+ }
+
+ let mut network = SyncNetworkContext::new_for_testing(
+ beacon_chain.clone(),
+ network_globals.clone(),
+ harness.runtime.task_executor.clone(),
+ );
+
+ let mut backfill = BackFillSync::new(beacon_chain, network_globals);
+ backfill.set_state(BackFillState::Syncing);
+
+ // if this ends up running into an infinite loop, the test will overflow the stack pretty quickly.
+ let _ = backfill.request_batches(&mut network);
+ }
+}
diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs
index 84e492c04f..9119b1652c 100644
--- a/beacon_node/network/src/sync/manager.rs
+++ b/beacon_node/network/src/sync/manager.rs
@@ -515,9 +515,7 @@ impl SyncManager {
// Remove peer from all data structures
self.range_sync.peer_disconnect(&mut self.network, peer_id);
- let _ = self
- .backfill_sync
- .peer_disconnected(peer_id, &mut self.network);
+ let _ = self.backfill_sync.peer_disconnected(peer_id);
self.block_lookups.peer_disconnected(peer_id);
// Regardless of the outcome, we update the sync status.
diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs
index 2cb5ec9a0a..d9eda651e7 100644
--- a/beacon_node/network/src/sync/network_context.rs
+++ b/beacon_node/network/src/sync/network_context.rs
@@ -9,6 +9,8 @@ use super::range_sync::ByRangeRequestType;
use super::SyncMessage;
use crate::metrics;
use crate::network_beacon_processor::NetworkBeaconProcessor;
+#[cfg(test)]
+use crate::network_beacon_processor::TestBeaconChainType;
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId;
@@ -27,18 +29,20 @@ use lighthouse_network::service::api_types::{
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use parking_lot::RwLock;
-use rand::prelude::IteratorRandom;
-use rand::thread_rng;
pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
};
+#[cfg(test)]
+use slot_clock::SlotClock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
+#[cfg(test)]
+use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{debug, error, span, warn, Level};
use types::blob_sidecar::FixedBlobSidecarList;
@@ -82,24 +86,18 @@ pub enum RpcResponseError {
#[derive(Debug, PartialEq, Eq)]
pub enum RpcRequestSendError {
- /// Network channel send failed
- NetworkSendError,
- NoCustodyPeers,
- CustodyRequestError(custody::Error),
- SlotClockError,
+ /// No peer available matching the required criteria
+ NoPeer(NoPeerError),
+ /// These errors should never happen, including unreachable custody errors or network send
+ /// errors.
+ InternalError(String),
}
-impl std::fmt::Display for RpcRequestSendError {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- match self {
- RpcRequestSendError::NetworkSendError => write!(f, "Network send error"),
- RpcRequestSendError::NoCustodyPeers => write!(f, "No custody peers"),
- RpcRequestSendError::CustodyRequestError(e) => {
- write!(f, "Custody request error: {:?}", e)
- }
- RpcRequestSendError::SlotClockError => write!(f, "Slot clock error"),
- }
- }
+/// Type of peer missing that caused a `RpcRequestSendError::NoPeers`
+#[derive(Debug, PartialEq, Eq)]
+pub enum NoPeerError {
+ BlockPeer,
+ CustodyPeer(ColumnIndex),
}
#[derive(Debug, PartialEq, Eq)]
@@ -232,6 +230,35 @@ pub enum RangeBlockComponent {
),
}
+#[cfg(test)]
+impl SyncNetworkContext> {
+ pub fn new_for_testing(
+ beacon_chain: Arc>>,
+ network_globals: Arc>,
+ task_executor: TaskExecutor,
+ ) -> Self {
+ let fork_context = Arc::new(ForkContext::new::(
+ beacon_chain.slot_clock.now().unwrap_or(Slot::new(0)),
+ beacon_chain.genesis_validators_root,
+ &beacon_chain.spec,
+ ));
+ let (network_tx, _network_rx) = mpsc::unbounded_channel();
+ let (beacon_processor, _) = NetworkBeaconProcessor::null_for_testing(
+ network_globals,
+ mpsc::unbounded_channel().0,
+ beacon_chain.clone(),
+ task_executor,
+ );
+
+ SyncNetworkContext::new(
+ network_tx,
+ Arc::new(beacon_processor),
+ beacon_chain,
+ fork_context,
+ )
+ }
+}
+
impl SyncNetworkContext {
pub fn new(
network_send: mpsc::UnboundedSender>,
@@ -331,12 +358,6 @@ impl SyncNetworkContext {
.custody_peers_for_column(column_index)
}
- pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option {
- self.get_custodial_peers(column_index)
- .into_iter()
- .choose(&mut thread_rng())
- }
-
pub fn network_globals(&self) -> &NetworkGlobals {
&self.network_beacon_processor.network_globals
}
@@ -381,34 +402,102 @@ impl SyncNetworkContext {
}
}
+ fn active_request_count_by_peer(&self) -> HashMap {
+ let Self {
+ network_send: _,
+ request_id: _,
+ blocks_by_root_requests,
+ blobs_by_root_requests,
+ data_columns_by_root_requests,
+ blocks_by_range_requests,
+ blobs_by_range_requests,
+ data_columns_by_range_requests,
+ // custody_by_root_requests is a meta request of data_columns_by_root_requests
+ custody_by_root_requests: _,
+ // components_by_range_requests is a meta request of various _by_range requests
+ components_by_range_requests: _,
+ execution_engine_state: _,
+ network_beacon_processor: _,
+ chain: _,
+ fork_context: _,
+ // Don't use a fallback match. We want to be sure that all requests are considered when
+ // adding new ones
+ } = self;
+
+ let mut active_request_count_by_peer = HashMap::::new();
+
+ for peer_id in blocks_by_root_requests
+ .iter_request_peers()
+ .chain(blobs_by_root_requests.iter_request_peers())
+ .chain(data_columns_by_root_requests.iter_request_peers())
+ .chain(blocks_by_range_requests.iter_request_peers())
+ .chain(blobs_by_range_requests.iter_request_peers())
+ .chain(data_columns_by_range_requests.iter_request_peers())
+ {
+ *active_request_count_by_peer.entry(peer_id).or_default() += 1;
+ }
+
+ active_request_count_by_peer
+ }
+
/// A blocks by range request sent by the range sync algorithm
pub fn block_components_by_range_request(
&mut self,
- peer_id: PeerId,
batch_type: ByRangeRequestType,
request: BlocksByRangeRequest,
requester: RangeRequestId,
+ peers: &HashSet,
+ peers_to_deprioritize: &HashSet,
) -> Result {
+ let active_request_count_by_peer = self.active_request_count_by_peer();
+
+ let Some(block_peer) = peers
+ .iter()
+ .map(|peer| {
+ (
+ // If contains -> 1 (order after), not contains -> 0 (order first)
+ peers_to_deprioritize.contains(peer),
+ // Prefer peers with less overall requests
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
+ rand::random::(),
+ peer,
+ )
+ })
+ .min()
+ .map(|(_, _, _, peer)| *peer)
+ else {
+ // Backfill and forward sync handle this condition gracefully.
+ // - Backfill sync: will pause waiting for more peers to join
+ // - Forward sync: can never happen as the chain is dropped when removing the last peer.
+ return Err(RpcRequestSendError::NoPeer(NoPeerError::BlockPeer));
+ };
+
+ // Attempt to find all required custody peers before sending any request or creating an ID
+ let columns_by_range_peers_to_request =
+ if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
+ let column_indexes = self.network_globals().sampling_columns.clone();
+ Some(self.select_columns_by_range_peers_to_request(
+ &column_indexes,
+ peers,
+ active_request_count_by_peer,
+ peers_to_deprioritize,
+ )?)
+ } else {
+ None
+ };
+
// Create the overall components_by_range request ID before its individual components
let id = ComponentsByRangeRequestId {
id: self.next_id(),
requester,
};
- // Compute custody column peers before sending the blocks_by_range request. If we don't have
- // enough peers, error here.
- let data_column_requests = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
- let column_indexes = self.network_globals().sampling_columns.clone();
- Some(self.make_columns_by_range_requests(request.clone(), &column_indexes)?)
- } else {
- None
- };
-
- let blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?;
+ let blocks_req_id = self.send_blocks_by_range_request(block_peer, request.clone(), id)?;
let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) {
Some(self.send_blobs_by_range_request(
- peer_id,
+ block_peer,
BlobsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
@@ -419,64 +508,98 @@ impl SyncNetworkContext {
None
};
- let data_columns = if let Some(data_column_requests) = data_column_requests {
- let data_column_requests = data_column_requests
- .into_iter()
- .map(|(peer_id, columns_by_range_request)| {
- self.send_data_columns_by_range_request(peer_id, columns_by_range_request, id)
- })
- .collect::, _>>()?;
+ let data_column_requests = columns_by_range_peers_to_request
+ .map(|columns_by_range_peers_to_request| {
+ columns_by_range_peers_to_request
+ .into_iter()
+ .map(|(peer_id, columns)| {
+ self.send_data_columns_by_range_request(
+ peer_id,
+ DataColumnsByRangeRequest {
+ start_slot: *request.start_slot(),
+ count: *request.count(),
+ columns,
+ },
+ id,
+ )
+ })
+ .collect::, _>>()
+ })
+ .transpose()?;
- Some((
- data_column_requests,
- self.network_globals()
- .sampling_columns
- .iter()
- .cloned()
- .collect::>(),
- ))
- } else {
- None
- };
-
- let info = RangeBlockComponentsRequest::new(blocks_req_id, blobs_req_id, data_columns);
+ let info = RangeBlockComponentsRequest::new(
+ blocks_req_id,
+ blobs_req_id,
+ data_column_requests.map(|data_column_requests| {
+ (
+ data_column_requests,
+ self.network_globals()
+ .sampling_columns
+ .clone()
+ .iter()
+ .copied()
+ .collect(),
+ )
+ }),
+ );
self.components_by_range_requests.insert(id, info);
Ok(id.id)
}
- fn make_columns_by_range_requests(
+ fn select_columns_by_range_peers_to_request(
&self,
- request: BlocksByRangeRequest,
custody_indexes: &HashSet,
- ) -> Result, RpcRequestSendError> {
- let mut peer_id_to_request_map = HashMap::new();
+ peers: &HashSet,
+ active_request_count_by_peer: HashMap,
+ peers_to_deprioritize: &HashSet,
+ ) -> Result>, RpcRequestSendError> {
+ let mut columns_to_request_by_peer = HashMap::>::new();
for column_index in custody_indexes {
- // TODO(das): The peer selection logic here needs to be improved - we should probably
- // avoid retrying from failed peers, however `BatchState` currently only tracks the peer
- // serving the blocks.
- let Some(custody_peer) = self.get_random_custodial_peer(*column_index) else {
+ // Strictly consider peers that are custodials of this column AND are part of this
+ // syncing chain. If the forward range sync chain has few peers, it's likely that this
+ // function will not be able to find peers on our custody columns.
+ let Some(custody_peer) = peers
+ .iter()
+ .filter(|peer| {
+ self.network_globals()
+ .is_custody_peer_of(*column_index, peer)
+ })
+ .map(|peer| {
+ (
+ // If contains -> 1 (order after), not contains -> 0 (order first)
+ peers_to_deprioritize.contains(peer),
+ // Prefer peers with less overall requests
+ // Also account for requests that are not yet issued tracked in peer_id_to_request_map
+ // We batch requests to the same peer, so count existance in the
+ // `columns_to_request_by_peer` as a single 1 request.
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0)
+ + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
+ rand::random::(),
+ peer,
+ )
+ })
+ .min()
+ .map(|(_, _, _, peer)| *peer)
+ else {
// TODO(das): this will be pretty bad UX. To improve we should:
- // - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
// minutes / seconds to the peer manager to locate peers on this subnet before
// abandoing progress on the chain completely.
- return Err(RpcRequestSendError::NoCustodyPeers);
+ return Err(RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer(
+ *column_index,
+ )));
};
- let columns_by_range_request = peer_id_to_request_map
+ columns_to_request_by_peer
.entry(custody_peer)
- .or_insert_with(|| DataColumnsByRangeRequest {
- start_slot: *request.start_slot(),
- count: *request.count(),
- columns: vec![],
- });
-
- columns_by_range_request.columns.push(*column_index);
+ .or_default()
+ .push(*column_index);
}
- Ok(peer_id_to_request_map)
+ Ok(columns_to_request_by_peer)
}
/// Received a blocks by range or blobs by range response for a request that couples blocks '
@@ -536,11 +659,21 @@ impl SyncNetworkContext {
lookup_peers: Arc>>,
block_root: Hash256,
) -> Result {
+ let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(peer_id) = lookup_peers
.read()
.iter()
- .choose(&mut rand::thread_rng())
- .copied()
+ .map(|peer| {
+ (
+ // Prefer peers with less overall requests
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
+ rand::random::(),
+ peer,
+ )
+ })
+ .min()
+ .map(|(_, _, peer)| *peer)
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
@@ -597,7 +730,7 @@ impl SyncNetworkContext {
request: RequestType::BlocksByRoot(request.into_request(&self.fork_context)),
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "BlocksByRoot",
@@ -632,11 +765,21 @@ impl SyncNetworkContext {
block_root: Hash256,
expected_blobs: usize,
) -> Result {
+ let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(peer_id) = lookup_peers
.read()
.iter()
- .choose(&mut rand::thread_rng())
- .copied()
+ .map(|peer| {
+ (
+ // Prefer peers with less overall requests
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
+ rand::random::(),
+ peer,
+ )
+ })
+ .min()
+ .map(|(_, _, peer)| *peer)
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
@@ -686,7 +829,7 @@ impl SyncNetworkContext {
request: RequestType::BlobsByRoot(request.clone().into_request(&self.fork_context)),
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "BlobsByRoot",
@@ -821,7 +964,25 @@ impl SyncNetworkContext {
self.custody_by_root_requests.insert(requester, request);
Ok(LookupRequestResult::RequestSent(id.req_id))
}
- Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)),
+ Err(e) => Err(match e {
+ CustodyRequestError::NoPeer(column_index) => {
+ RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer(column_index))
+ }
+ // - TooManyFailures: Should never happen, `request` has just been created, it's
+ // count of download_failures is 0 here
+ // - BadState: Should never happen, a bad state can only happen when handling a
+ // network response
+ // - UnexpectedRequestId: Never happens: this Err is only constructed handling a
+ // download or processing response
+ // - SendFailed: Should never happen unless in a bad drop sequence when shutting
+ // down the node
+ e @ (CustodyRequestError::TooManyFailures
+ | CustodyRequestError::BadState { .. }
+ | CustodyRequestError::UnexpectedRequestId { .. }
+ | CustodyRequestError::SendFailed { .. }) => {
+ RpcRequestSendError::InternalError(format!("{e:?}"))
+ }
+ }),
}
}
@@ -841,7 +1002,7 @@ impl SyncNetworkContext {
request: RequestType::BlocksByRange(request.clone().into()),
app_request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "BlocksByRange",
@@ -882,7 +1043,7 @@ impl SyncNetworkContext {
request: RequestType::BlobsByRange(request.clone()),
app_request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "BlobsByRange",
@@ -921,7 +1082,7 @@ impl SyncNetworkContext {
request: RequestType::DataColumnsByRange(request.clone()),
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "DataColumnsByRange",
diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs
index e7e6e62349..f4d010b881 100644
--- a/beacon_node/network/src/sync/network_context/custody.rs
+++ b/beacon_node/network/src/sync/network_context/custody.rs
@@ -45,7 +45,7 @@ pub enum Error {
SendFailed(&'static str),
TooManyFailures,
BadState(String),
- NoPeers(ColumnIndex),
+ NoPeer(ColumnIndex),
/// Received a download result for a different request id than the in-flight request.
/// There should only exist a single request at a time. Having multiple requests is a bug and
/// can result in undefined state, so it's treated as a hard error and the lookup is dropped.
@@ -56,7 +56,6 @@ pub enum Error {
}
struct ActiveBatchColumnsRequest {
- peer_id: PeerId,
indices: Vec,
}
@@ -220,6 +219,7 @@ impl ActiveCustodyRequest {
return Ok(Some((columns, peer_group, max_seen_timestamp)));
}
+ let active_request_count_by_peer = cx.active_request_count_by_peer();
let mut columns_to_request_by_peer = HashMap::>::new();
let lookup_peers = self.lookup_peers.read();
@@ -238,15 +238,11 @@ impl ActiveCustodyRequest {
// only query the peers on that fork. Should this case be handled? How to handle it?
let custodial_peers = cx.get_custodial_peers(*column_index);
- // TODO(das): cache this computation in a OneCell or similar to prevent having to
- // run it every loop
- let mut active_requests_by_peer = HashMap::::new();
- for batch_request in self.active_batch_columns_requests.values() {
- *active_requests_by_peer
- .entry(batch_request.peer_id)
- .or_default() += 1;
- }
-
+ // We draw from the total set of peers, but prioritize those peers who we have
+ // received an attestation / status / block message claiming to have imported the
+ // lookup. The frequency of those messages is low, so drawing only from lookup_peers
+ // could cause many lookups to take much longer or fail as they don't have enough
+ // custody peers on a given column
let mut priorized_peers = custodial_peers
.iter()
.map(|peer| {
@@ -256,9 +252,12 @@ impl ActiveCustodyRequest {
// De-prioritize peers that have failed to successfully respond to
// requests recently
self.failed_peers.contains(peer),
- // Prefer peers with less requests to load balance across peers
- active_requests_by_peer.get(peer).copied().unwrap_or(0),
- // Final random factor to give all peers a shot in each retry
+ // Prefer peers with fewer requests to load balance across peers.
+ // We batch requests to the same peer, so count existence in the
+ // `columns_to_request_by_peer` as a single 1 request.
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0)
+ + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
rand::thread_rng().gen::(),
*peer,
)
@@ -276,7 +275,7 @@ impl ActiveCustodyRequest {
// `MAX_STALE_NO_PEERS_DURATION`, else error and drop the request. Note that
// lookup will naturally retry when other peers send us attestations for
// descendants of this un-available lookup.
- return Err(Error::NoPeers(*column_index));
+ return Err(Error::NoPeer(*column_index));
} else {
// Do not issue requests if there is no custody peer on this column
}
@@ -306,13 +305,14 @@ impl ActiveCustodyRequest {
let column_request = self
.column_requests
.get_mut(column_index)
+ // Should never happen: column_index is iterated from column_requests
.ok_or(Error::BadState("unknown column_index".to_owned()))?;
column_request.on_download_start(req_id)?;
}
self.active_batch_columns_requests
- .insert(req_id, ActiveBatchColumnsRequest { indices, peer_id });
+ .insert(req_id, ActiveBatchColumnsRequest { indices });
}
LookupRequestResult::NoRequestNeeded(_) => unreachable!(),
LookupRequestResult::Pending(_) => unreachable!(),
diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs
index c9b85e47b6..963b633ed6 100644
--- a/beacon_node/network/src/sync/network_context/requests.rs
+++ b/beacon_node/network/src/sync/network_context/requests.rs
@@ -179,6 +179,10 @@ impl ActiveRequests {
.collect()
}
+ pub fn iter_request_peers(&self) -> impl Iterator
- + '_ {
+ self.requests.values().map(|request| request.peer_id)
+ }
+
pub fn len(&self) -> usize {
self.requests.len()
}
diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs
index c1ad550376..264f83ee82 100644
--- a/beacon_node/network/src/sync/range_sync/batch.rs
+++ b/beacon_node/network/src/sync/range_sync/batch.rs
@@ -107,7 +107,7 @@ pub struct BatchInfo {
/// Number of processing attempts that have failed but we do not count.
non_faulty_processing_attempts: u8,
/// The number of download retries this batch has undergone due to a failed request.
- failed_download_attempts: Vec,
+ failed_download_attempts: Vec