diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index d8a52dd010..de4fd29409 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -33,6 +33,7 @@ jobs:
arch: [aarch64-unknown-linux-gnu,
x86_64-unknown-linux-gnu,
x86_64-apple-darwin,
+ aarch64-apple-darwin,
x86_64-windows]
include:
- arch: aarch64-unknown-linux-gnu
@@ -44,6 +45,9 @@ jobs:
- arch: x86_64-apple-darwin
runner: macos-13
profile: maxperf
+ - arch: aarch64-apple-darwin
+ runner: macos-14
+ profile: maxperf
- arch: x86_64-windows
runner: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "windows", "release"]') || 'windows-2019' }}
profile: maxperf
@@ -94,6 +98,10 @@ jobs:
if: matrix.arch == 'x86_64-apple-darwin'
run: cargo install --path lighthouse --force --locked --features portable,gnosis --profile ${{ matrix.profile }}
+ - name: Build Lighthouse for aarch64-apple-darwin
+ if: matrix.arch == 'aarch64-apple-darwin'
+ run: cargo install --path lighthouse --force --locked --features portable,gnosis --profile ${{ matrix.profile }}
+
- name: Build Lighthouse for Windows
if: matrix.arch == 'x86_64-windows'
run: cargo install --path lighthouse --force --locked --features portable,gnosis --profile ${{ matrix.profile }}
@@ -236,13 +244,14 @@ jobs:
| System | Architecture | Binary | PGP Signature |
|:---:|:---:|:---:|:---|
- |
| x86_64 | [lighthouse-${{ env.VERSION }}-x86_64-apple-darwin.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-apple-darwin.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-apple-darwin.tar.gz.asc) |
- |
| x86_64 | [lighthouse-${{ env.VERSION }}-x86_64-unknown-linux-gnu.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-unknown-linux-gnu.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-unknown-linux-gnu.tar.gz.asc) |
- |
| aarch64 | [lighthouse-${{ env.VERSION }}-aarch64-unknown-linux-gnu.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-unknown-linux-gnu.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-unknown-linux-gnu.tar.gz.asc) |
- |
| x86_64 | [lighthouse-${{ env.VERSION }}-x86_64-windows.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-windows.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-windows.tar.gz.asc) |
+ |
| x86_64 | [lighthouse-${{ env.VERSION }}-x86_64-apple-darwin.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-apple-darwin.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-apple-darwin.tar.gz.asc) |
+ |
| aarch64 | [lighthouse-${{ env.VERSION }}-aarch64-apple-darwin.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-apple-darwin.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-apple-darwin.tar.gz.asc) |
+ |
| x86_64 | [lighthouse-${{ env.VERSION }}-x86_64-unknown-linux-gnu.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-unknown-linux-gnu.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-unknown-linux-gnu.tar.gz.asc) |
+ |
| aarch64 | [lighthouse-${{ env.VERSION }}-aarch64-unknown-linux-gnu.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-unknown-linux-gnu.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-aarch64-unknown-linux-gnu.tar.gz.asc) |
+ |
| x86_64 | [lighthouse-${{ env.VERSION }}-x86_64-windows.tar.gz](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-windows.tar.gz) | [PGP Signature](https://github.com/${{ env.REPO_NAME }}/releases/download/${{ env.VERSION }}/lighthouse-${{ env.VERSION }}-x86_64-windows.tar.gz.asc) |
| | | | |
| **System** | **Option** | - | **Resource** |
- |
| Docker | [${{ env.VERSION }}](https://hub.docker.com/r/${{ env.IMAGE_NAME }}/tags?page=1&ordering=last_updated&name=${{ env.VERSION }}) | [${{ env.IMAGE_NAME }}](https://hub.docker.com/r/${{ env.IMAGE_NAME }}) |
+ |
| Docker | [${{ env.VERSION }}](https://hub.docker.com/r/${{ env.IMAGE_NAME }}/tags?page=1&ordering=last_updated&name=${{ env.VERSION }}) | [${{ env.IMAGE_NAME }}](https://hub.docker.com/r/${{ env.IMAGE_NAME }}) |
ENDBODY
)
assets=(./lighthouse-*.tar.gz*/lighthouse-*.tar.gz*)
diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml
index 817fd9524d..64a93ab5ae 100644
--- a/.github/workflows/test-suite.yml
+++ b/.github/workflows/test-suite.yml
@@ -295,7 +295,7 @@ jobs:
with:
channel: stable
cache-target: release
- - name: Run a basic beacon chain sim that starts from Bellatrix
+ - name: Run a basic beacon chain sim that starts from Deneb
run: cargo run --release --bin simulator basic-sim
fallback-simulator-ubuntu:
name: fallback-simulator-ubuntu
diff --git a/Cargo.lock b/Cargo.lock
index b98e096718..ff87c32783 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -30,6 +30,7 @@ dependencies = [
"filesystem",
"safe_arith",
"sensitive_url",
+ "serde_json",
"slashing_protection",
"slot_clock",
"tempfile",
@@ -812,6 +813,7 @@ dependencies = [
"maplit",
"merkle_proof",
"metrics",
+ "once_cell",
"oneshot_broadcast",
"operation_pool",
"parking_lot 0.12.3",
@@ -877,14 +879,13 @@ name = "beacon_node_fallback"
version = "0.1.0"
dependencies = [
"clap",
- "environment",
"eth2",
"futures",
"itertools 0.10.5",
- "logging",
"serde",
"slot_clock",
"strum",
+ "task_executor",
"tokio",
"tracing",
"types",
@@ -2362,6 +2363,7 @@ dependencies = [
"tokio",
"tracing",
"types",
+ "validator_store",
]
[[package]]
@@ -5633,6 +5635,32 @@ dependencies = [
"unused_port",
]
+[[package]]
+name = "lighthouse_validator_store"
+version = "0.1.0"
+dependencies = [
+ "account_utils",
+ "beacon_node_fallback",
+ "doppelganger_service",
+ "either",
+ "environment",
+ "eth2",
+ "futures",
+ "initialized_validators",
+ "logging",
+ "parking_lot 0.12.3",
+ "serde",
+ "signing_method",
+ "slashing_protection",
+ "slot_clock",
+ "task_executor",
+ "tokio",
+ "tracing",
+ "types",
+ "validator_metrics",
+ "validator_store",
+]
+
[[package]]
name = "lighthouse_version"
version = "0.1.0"
@@ -5730,8 +5758,6 @@ dependencies = [
"chrono",
"logroller",
"metrics",
- "once_cell",
- "parking_lot 0.12.3",
"serde",
"serde_json",
"tokio",
@@ -9667,6 +9693,7 @@ dependencies = [
"graffiti_file",
"hyper 1.6.0",
"initialized_validators",
+ "lighthouse_validator_store",
"metrics",
"monitoring_api",
"parking_lot 0.12.3",
@@ -9722,6 +9749,7 @@ dependencies = [
"health_metrics",
"initialized_validators",
"itertools 0.10.5",
+ "lighthouse_validator_store",
"lighthouse_version",
"logging",
"parking_lot 0.12.3",
@@ -9754,6 +9782,7 @@ name = "validator_http_metrics"
version = "0.1.0"
dependencies = [
"health_metrics",
+ "lighthouse_validator_store",
"lighthouse_version",
"logging",
"malloc_utils",
@@ -9765,7 +9794,6 @@ dependencies = [
"types",
"validator_metrics",
"validator_services",
- "validator_store",
"warp",
"warp_utils",
]
@@ -9808,9 +9836,7 @@ version = "0.1.0"
dependencies = [
"beacon_node_fallback",
"bls",
- "doppelganger_service",
"either",
- "environment",
"eth2",
"futures",
"graffiti_file",
@@ -9818,6 +9844,7 @@ dependencies = [
"parking_lot 0.12.3",
"safe_arith",
"slot_clock",
+ "task_executor",
"tokio",
"tracing",
"tree_hash",
@@ -9830,19 +9857,8 @@ dependencies = [
name = "validator_store"
version = "0.1.0"
dependencies = [
- "account_utils",
- "doppelganger_service",
- "initialized_validators",
- "logging",
- "parking_lot 0.12.3",
- "serde",
- "signing_method",
"slashing_protection",
- "slot_clock",
- "task_executor",
- "tracing",
"types",
- "validator_metrics",
]
[[package]]
@@ -10100,6 +10116,7 @@ dependencies = [
"eth2_network_config",
"futures",
"initialized_validators",
+ "lighthouse_validator_store",
"logging",
"parking_lot 0.12.3",
"reqwest",
diff --git a/Cargo.toml b/Cargo.toml
index 31f50068dc..075552b281 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -96,11 +96,11 @@ members = [
"validator_client/http_api",
"validator_client/http_metrics",
"validator_client/initialized_validators",
+ "validator_client/lighthouse_validator_store",
"validator_client/signing_method",
"validator_client/slashing_protection",
"validator_client/validator_metrics",
"validator_client/validator_services",
- "validator_client/validator_store",
"validator_manager",
]
@@ -161,6 +161,7 @@ maplit = "1"
milhouse = "0.5"
mockito = "1.5.0"
num_cpus = "1"
+once_cell = "1.17.1"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", default-features = false }
@@ -227,7 +228,6 @@ compare_fields = { path = "common/compare_fields" }
deposit_contract = { path = "common/deposit_contract" }
directory = { path = "common/directory" }
doppelganger_service = { path = "validator_client/doppelganger_service" }
-validator_services = { path = "validator_client/validator_services" }
environment = { path = "lighthouse/environment" }
eth1 = { path = "beacon_node/eth1" }
eth1_test_rig = { path = "testing/eth1_test_rig" }
@@ -249,6 +249,7 @@ int_to_bytes = { path = "consensus/int_to_bytes" }
kzg = { path = "crypto/kzg" }
metrics = { path = "common/metrics" }
lighthouse_network = { path = "beacon_node/lighthouse_network" }
+lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" }
lighthouse_version = { path = "common/lighthouse_version" }
workspace_members = { path = "common/workspace_members" }
lockfile = { path = "common/lockfile" }
@@ -280,6 +281,7 @@ validator_dir = { path = "common/validator_dir" }
validator_http_api = { path = "validator_client/http_api" }
validator_http_metrics = { path = "validator_client/http_metrics" }
validator_metrics = { path = "validator_client/validator_metrics" }
+validator_services = { path = "validator_client/validator_services" }
validator_store = { path = "validator_client/validator_store" }
validator_test_rig = { path = "testing/validator_test_rig" }
warp_utils = { path = "common/warp_utils" }
diff --git a/Cross.toml b/Cross.toml
index 8181967f32..391e8751c8 100644
--- a/Cross.toml
+++ b/Cross.toml
@@ -4,6 +4,11 @@ pre-build = ["apt-get install -y cmake clang-5.0"]
[target.aarch64-unknown-linux-gnu]
pre-build = ["apt-get install -y cmake clang-5.0"]
+[target.riscv64gc-unknown-linux-gnu]
+pre-build = ["apt-get install -y cmake clang"]
+# Use the most recent Cross image for RISCV because the stable 0.2.5 image doesn't work
+image = "ghcr.io/cross-rs/riscv64gc-unknown-linux-gnu:main"
+
# Allow setting page size limits for jemalloc at build time:
# For certain architectures (like aarch64), we must compile
# jemalloc with support for large page sizes, otherwise the host's
diff --git a/Makefile b/Makefile
index f621f38a63..03bf33a6d8 100644
--- a/Makefile
+++ b/Makefile
@@ -10,6 +10,8 @@ X86_64_TAG = "x86_64-unknown-linux-gnu"
BUILD_PATH_X86_64 = "target/$(X86_64_TAG)/release"
AARCH64_TAG = "aarch64-unknown-linux-gnu"
BUILD_PATH_AARCH64 = "target/$(AARCH64_TAG)/release"
+RISCV64_TAG = "riscv64gc-unknown-linux-gnu"
+BUILD_PATH_RISCV64 = "target/$(RISCV64_TAG)/release"
PINNED_NIGHTLY ?= nightly
@@ -67,6 +69,8 @@ build-aarch64:
# pages, which are commonly used by aarch64 systems.
# See: https://github.com/sigp/lighthouse/issues/5244
JEMALLOC_SYS_WITH_LG_PAGE=16 cross build --bin lighthouse --target aarch64-unknown-linux-gnu --features "portable,$(CROSS_FEATURES)" --profile "$(CROSS_PROFILE)" --locked
+build-riscv64:
+ cross build --bin lighthouse --target riscv64gc-unknown-linux-gnu --features "portable,$(CROSS_FEATURES)" --profile "$(CROSS_PROFILE)" --locked
build-lcli-x86_64:
cross build --bin lcli --target x86_64-unknown-linux-gnu --features "portable" --profile "$(CROSS_PROFILE)" --locked
@@ -75,6 +79,8 @@ build-lcli-aarch64:
# pages, which are commonly used by aarch64 systems.
# See: https://github.com/sigp/lighthouse/issues/5244
JEMALLOC_SYS_WITH_LG_PAGE=16 cross build --bin lcli --target aarch64-unknown-linux-gnu --features "portable" --profile "$(CROSS_PROFILE)" --locked
+build-lcli-riscv64:
+ cross build --bin lcli --target riscv64gc-unknown-linux-gnu --features "portable" --profile "$(CROSS_PROFILE)" --locked
# Create a `.tar.gz` containing a binary for a specific target.
define tarball_release_binary
@@ -95,6 +101,9 @@ build-release-tarballs:
$(call tarball_release_binary,$(BUILD_PATH_X86_64),$(X86_64_TAG),"")
$(MAKE) build-aarch64
$(call tarball_release_binary,$(BUILD_PATH_AARCH64),$(AARCH64_TAG),"")
+ $(MAKE) build-riscv64
+ $(call tarball_release_binary,$(BUILD_PATH_RISCV64),$(RISCV64_TAG),"")
+
# Runs the full workspace tests in **release**, without downloading any additional
# test vectors.
diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml
index a7752d621f..071e2681dd 100644
--- a/account_manager/Cargo.toml
+++ b/account_manager/Cargo.toml
@@ -22,6 +22,7 @@ eth2_wallet_manager = { path = "../common/eth2_wallet_manager" }
filesystem = { workspace = true }
safe_arith = { workspace = true }
sensitive_url = { workspace = true }
+serde_json = { workspace = true }
slashing_protection = { workspace = true }
slot_clock = { workspace = true }
tokio = { workspace = true }
diff --git a/account_manager/src/validator/exit.rs b/account_manager/src/validator/exit.rs
index 8a2cdb8400..1393d0f152 100644
--- a/account_manager/src/validator/exit.rs
+++ b/account_manager/src/validator/exit.rs
@@ -11,6 +11,7 @@ use eth2_keystore::Keystore;
use eth2_network_config::Eth2NetworkConfig;
use safe_arith::SafeArith;
use sensitive_url::SensitiveUrl;
+use serde_json;
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::path::{Path, PathBuf};
use std::time::Duration;
@@ -24,6 +25,7 @@ pub const BEACON_SERVER_FLAG: &str = "beacon-node";
pub const NO_WAIT: &str = "no-wait";
pub const NO_CONFIRMATION: &str = "no-confirmation";
pub const PASSWORD_PROMPT: &str = "Enter the keystore password";
+pub const PRESIGN: &str = "presign";
pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/";
pub const CONFIRMATION_PHRASE: &str = "Exit my validator";
@@ -74,6 +76,15 @@ pub fn cli_app() -> Command {
.action(ArgAction::SetTrue)
.help_heading(FLAG_HEADER)
)
+ .arg(
+ Arg::new(PRESIGN)
+ .long(PRESIGN)
+ .help("Only presign the voluntary exit message without publishing it")
+ .default_value("false")
+ .action(ArgAction::SetTrue)
+ .help_heading(FLAG_HEADER)
+ .display_order(0)
+ )
}
pub fn cli_run(matches: &ArgMatches, env: Environment) -> Result<(), String> {
@@ -84,6 +95,7 @@ pub fn cli_run(matches: &ArgMatches, env: Environment) -> Result<
let stdin_inputs = cfg!(windows) || matches.get_flag(STDIN_INPUTS_FLAG);
let no_wait = matches.get_flag(NO_WAIT);
let no_confirmation = matches.get_flag(NO_CONFIRMATION);
+ let presign = matches.get_flag(PRESIGN);
let spec = env.eth2_config().spec.clone();
let server_url: String = clap_utils::parse_required(matches, BEACON_SERVER_FLAG)?;
@@ -107,6 +119,7 @@ pub fn cli_run(matches: &ArgMatches, env: Environment) -> Result<
ð2_network_config,
no_wait,
no_confirmation,
+ presign,
))?;
Ok(())
@@ -123,6 +136,7 @@ async fn publish_voluntary_exit(
eth2_network_config: &Eth2NetworkConfig,
no_wait: bool,
no_confirmation: bool,
+ presign: bool,
) -> Result<(), String> {
let genesis_data = get_geneisis_data(client).await?;
let testnet_genesis_root = eth2_network_config
@@ -154,6 +168,23 @@ async fn publish_voluntary_exit(
validator_index,
};
+ // Sign the voluntary exit. We sign ahead of the prompt as that step is only important for the broadcast
+ let signed_voluntary_exit =
+ voluntary_exit.sign(&keypair.sk, genesis_data.genesis_validators_root, spec);
+ if presign {
+ eprintln!(
+ "Successfully pre-signed voluntary exit for validator {}. Not publishing.",
+ keypair.pk
+ );
+
+ // Convert to JSON and print
+ let string_output = serde_json::to_string_pretty(&signed_voluntary_exit)
+ .map_err(|e| format!("Unable to convert to JSON: {}", e))?;
+
+ println!("{}", string_output);
+ return Ok(());
+ }
+
eprintln!(
"Publishing a voluntary exit for validator: {} \n",
keypair.pk
@@ -174,9 +205,7 @@ async fn publish_voluntary_exit(
};
if confirmation == CONFIRMATION_PHRASE {
- // Sign and publish the voluntary exit to network
- let signed_voluntary_exit =
- voluntary_exit.sign(&keypair.sk, genesis_data.genesis_validators_root, spec);
+ // Publish the voluntary exit to network
client
.post_beacon_pool_voluntary_exits(&signed_voluntary_exit)
.await
diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml
index 0cf9ae1a10..18b40cab7e 100644
--- a/beacon_node/beacon_chain/Cargo.toml
+++ b/beacon_node/beacon_chain/Cargo.toml
@@ -47,6 +47,7 @@ logging = { workspace = true }
lru = { workspace = true }
merkle_proof = { workspace = true }
metrics = { workspace = true }
+once_cell = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
operation_pool = { workspace = true }
parking_lot = { workspace = true }
diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs
index 567433caee..56b13b0b77 100644
--- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs
+++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs
@@ -11,10 +11,12 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use fork_choice::ExecutionStatus;
use lru::LruCache;
+use once_cell::sync::OnceCell;
use smallvec::SmallVec;
use state_processing::state_advance::partial_state_advance;
use std::cmp::Ordering;
use std::num::NonZeroUsize;
+use std::sync::Arc;
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
@@ -39,21 +41,21 @@ pub struct Proposer {
/// their signatures.
pub struct EpochBlockProposers {
/// The epoch to which the proposers pertain.
- epoch: Epoch,
+ pub(crate) epoch: Epoch,
/// The fork that should be used to verify proposer signatures.
- fork: Fork,
+ pub(crate) fork: Fork,
/// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot
/// in that epoch.
///
/// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`.
- proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
+ pub(crate) proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
}
/// A cache to store the proposers for some epoch.
///
/// See the module-level documentation for more information.
pub struct BeaconProposerCache {
- cache: LruCache<(Epoch, Hash256), EpochBlockProposers>,
+ cache: LruCache<(Epoch, Hash256), Arc>>,
}
impl Default for BeaconProposerCache {
@@ -74,7 +76,8 @@ impl BeaconProposerCache {
) -> Option {
let epoch = slot.epoch(E::slots_per_epoch());
let key = (epoch, shuffling_decision_block);
- if let Some(cache) = self.cache.get(&key) {
+ let cache_opt = self.cache.get(&key).and_then(|cell| cell.get());
+ if let Some(cache) = cache_opt {
// This `if` statement is likely unnecessary, but it feels like good practice.
if epoch == cache.epoch {
cache
@@ -103,7 +106,26 @@ impl BeaconProposerCache {
epoch: Epoch,
) -> Option<&SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>> {
let key = (epoch, shuffling_decision_block);
- self.cache.get(&key).map(|cache| &cache.proposers)
+ self.cache
+ .get(&key)
+ .and_then(|cache_once_cell| cache_once_cell.get().map(|proposers| &proposers.proposers))
+ }
+
+ /// Returns the `OnceCell` for the given `(epoch, shuffling_decision_block)` key,
+ /// inserting an empty one if it doesn't exist.
+ ///
+ /// The returned `OnceCell` allows the caller to initialise the value externally
+ /// using `get_or_try_init`, enabling deferred computation without holding a mutable
+ /// reference to the cache.
+ pub fn get_or_insert_key(
+ &mut self,
+ epoch: Epoch,
+ shuffling_decision_block: Hash256,
+ ) -> Arc> {
+ let key = (epoch, shuffling_decision_block);
+ self.cache
+ .get_or_insert(key, || Arc::new(OnceCell::new()))
+ .clone()
}
/// Insert the proposers into the cache.
@@ -120,14 +142,13 @@ impl BeaconProposerCache {
) -> Result<(), BeaconStateError> {
let key = (epoch, shuffling_decision_block);
if !self.cache.contains(&key) {
- self.cache.put(
- key,
- EpochBlockProposers {
- epoch,
- fork,
- proposers: proposers.into(),
- },
- );
+ let epoch_proposers = EpochBlockProposers {
+ epoch,
+ fork,
+ proposers: proposers.into(),
+ };
+ self.cache
+ .put(key, Arc::new(OnceCell::with_value(epoch_proposers)));
}
Ok(())
diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs
index f5fd24483a..5b5a6fcc0d 100644
--- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs
+++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs
@@ -331,7 +331,7 @@ impl PendingComponents {
format!(
"block {} blobs {}/{}",
block_count,
- self.verified_blobs.len(),
+ self.verified_blobs.iter().flatten().count(),
num_expected_blobs
)
}
diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs
index 57efbb0a77..20b5c9aa02 100644
--- a/beacon_node/beacon_chain/src/data_column_verification.rs
+++ b/beacon_node/beacon_chain/src/data_column_verification.rs
@@ -1,3 +1,4 @@
+use crate::beacon_proposer_cache::EpochBlockProposers;
use crate::block_verification::{
cheap_state_advance_to_obtain_committees, get_validator_pubkey_cache, process_block_slash_info,
BlockSlashInfo,
@@ -9,7 +10,6 @@ use derivative::Derivative;
use fork_choice::ProtoBlock;
use kzg::{Error as KzgError, Kzg};
use proto_array::Block;
-use slasher::test_utils::E;
use slot_clock::SlotClock;
use ssz_derive::{Decode, Encode};
use std::iter;
@@ -588,28 +588,33 @@ fn verify_proposer_and_signature(
chain: &BeaconChain,
) -> Result<(), GossipDataColumnError> {
let column_slot = data_column.slot();
- let column_epoch = column_slot.epoch(E::slots_per_epoch());
+ let slots_per_epoch = T::EthSpec::slots_per_epoch();
+ let column_epoch = column_slot.epoch(slots_per_epoch);
let column_index = data_column.index;
let block_root = data_column.block_root();
let block_parent_root = data_column.block_parent_root();
- let proposer_shuffling_root =
- if parent_block.slot.epoch(T::EthSpec::slots_per_epoch()) == column_epoch {
- parent_block
- .next_epoch_shuffling_id
- .shuffling_decision_block
- } else {
- parent_block.root
- };
+ let proposer_shuffling_root = if parent_block.slot.epoch(slots_per_epoch) == column_epoch {
+ parent_block
+ .next_epoch_shuffling_id
+ .shuffling_decision_block
+ } else {
+ parent_block.root
+ };
- let proposer_opt = chain
+ // We lock the cache briefly to get or insert a OnceCell, then drop the lock
+ // before doing proposer shuffling calculation via `OnceCell::get_or_try_init`. This avoids
+ // holding the lock during the computation, while still ensuring the result is cached and
+ // initialised only once.
+ //
+ // This approach exposes the cache internals (`OnceCell` & `EpochBlockProposers`)
+ // as a trade-off for avoiding lock contention.
+ let epoch_proposers_cell = chain
.beacon_proposer_cache
.lock()
- .get_slot::(proposer_shuffling_root, column_slot);
+ .get_or_insert_key(column_epoch, proposer_shuffling_root);
- let (proposer_index, fork) = if let Some(proposer) = proposer_opt {
- (proposer.index, proposer.fork)
- } else {
+ let epoch_proposers = epoch_proposers_cell.get_or_try_init(move || {
debug!(
%block_root,
index = %column_index,
@@ -633,19 +638,20 @@ fn verify_proposer_and_signature(
)?;
let proposers = state.get_beacon_proposer_indices(&chain.spec)?;
- let proposer_index = *proposers
- .get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize)
- .ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;
-
// Prime the proposer shuffling cache with the newly-learned value.
- chain.beacon_proposer_cache.lock().insert(
- column_epoch,
- proposer_shuffling_root,
- proposers,
- state.fork(),
- )?;
- (proposer_index, state.fork())
- };
+ Ok::<_, GossipDataColumnError>(EpochBlockProposers {
+ epoch: column_epoch,
+ fork: state.fork(),
+ proposers: proposers.into(),
+ })
+ })?;
+
+ let proposer_index = *epoch_proposers
+ .proposers
+ .get(column_slot.as_usize() % slots_per_epoch as usize)
+ .ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;
+
+ let fork = epoch_proposers.fork;
// Signature verify the signed block header.
let signature_is_valid = {
diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs
index eaaa23130d..704fb3663f 100644
--- a/beacon_node/beacon_chain/src/kzg_utils.rs
+++ b/beacon_node/beacon_chain/src/kzg_utils.rs
@@ -8,9 +8,9 @@ use std::sync::Arc;
use types::beacon_block_body::KzgCommitments;
use types::data_column_sidecar::{Cell, DataColumn, DataColumnSidecarError};
use types::{
- Blob, BlobSidecar, BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecar,
- DataColumnSidecarList, EthSpec, Hash256, KzgCommitment, KzgProof, SignedBeaconBlock,
- SignedBeaconBlockHeader, SignedBlindedBeaconBlock,
+ Blob, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList,
+ EthSpec, Hash256, KzgCommitment, KzgProof, SignedBeaconBlock, SignedBeaconBlockHeader,
+ SignedBlindedBeaconBlock,
};
/// Converts a blob ssz List object to an array to be used with the kzg
@@ -79,38 +79,27 @@ pub fn validate_data_columns<'a, E: EthSpec, I>(
where
I: Iterator- >> + Clone,
{
- let cells = data_column_iter
- .clone()
- .flat_map(|data_column| data_column.column.iter().map(ssz_cell_to_crypto_cell::))
- .collect::, KzgError>>()?;
+ let mut cells = Vec::new();
+ let mut proofs = Vec::new();
+ let mut column_indices = Vec::new();
+ let mut commitments = Vec::new();
- let proofs = data_column_iter
- .clone()
- .flat_map(|data_column| {
- data_column
- .kzg_proofs
- .iter()
- .map(|&proof| Bytes48::from(proof))
- })
- .collect::>();
+ for data_column in data_column_iter {
+ let col_index = data_column.index;
- let column_indices = data_column_iter
- .clone()
- .flat_map(|data_column| {
- let col_index = data_column.index;
- data_column.column.iter().map(move |_| col_index)
- })
- .collect::>();
+ for cell in &data_column.column {
+ cells.push(ssz_cell_to_crypto_cell::(cell)?);
+ column_indices.push(col_index);
+ }
- let commitments = data_column_iter
- .clone()
- .flat_map(|data_column| {
- data_column
- .kzg_commitments
- .iter()
- .map(|&commitment| Bytes48::from(commitment))
- })
- .collect::>();
+ for &proof in &data_column.kzg_proofs {
+ proofs.push(Bytes48::from(proof));
+ }
+
+ for &commitment in &data_column.kzg_commitments {
+ commitments.push(Bytes48::from(commitment));
+ }
+ }
kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments)
}
diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs
index a4f539aea0..2b6e72ae0c 100644
--- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs
+++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs
@@ -452,7 +452,7 @@ impl ReprocessQueue
{
if self.early_block_debounce.elapsed() {
warn!(
queue_size = MAXIMUM_QUEUED_BLOCKS,
- msg = "check system clock",
+ msg = "system resources may be saturated",
"Early blocks queue is full"
);
}
@@ -500,7 +500,7 @@ impl ReprocessQueue {
if self.rpc_block_debounce.elapsed() {
warn!(
queue_size = MAXIMUM_QUEUED_BLOCKS,
- msg = "check system clock",
+ msg = "system resources may be saturated",
"RPC blocks queue is full"
);
}
@@ -540,7 +540,7 @@ impl ReprocessQueue {
if self.attestation_delay_debounce.elapsed() {
error!(
queue_size = MAXIMUM_QUEUED_ATTESTATIONS,
- msg = "check system clock",
+ msg = "system resources may be saturated",
"Aggregate attestation delay queue is full"
);
}
@@ -572,7 +572,7 @@ impl ReprocessQueue {
if self.attestation_delay_debounce.elapsed() {
error!(
queue_size = MAXIMUM_QUEUED_ATTESTATIONS,
- msg = "check system clock",
+ msg = "system resources may be saturated",
"Attestation delay queue is full"
);
}
@@ -606,7 +606,7 @@ impl ReprocessQueue {
if self.lc_update_delay_debounce.elapsed() {
error!(
queue_size = MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES,
- msg = "check system clock",
+ msg = "system resources may be saturated",
"Light client updates delay queue is full"
);
}
diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs
index b36cb8075d..da986f2884 100644
--- a/beacon_node/lighthouse_network/src/metrics.rs
+++ b/beacon_node/lighthouse_network/src/metrics.rs
@@ -206,6 +206,20 @@ pub static REPORT_PEER_MSGS: LazyLock> = LazyLock::new(||
)
});
+pub static OUTBOUND_REQUEST_IDLING: LazyLock> = LazyLock::new(|| {
+ try_create_histogram(
+ "outbound_request_idling_seconds",
+ "The time our own request remained idle in the self-limiter",
+ )
+});
+
+pub static RESPONSE_IDLING: LazyLock> = LazyLock::new(|| {
+ try_create_histogram(
+ "response_idling_seconds",
+ "The time our response remained idle in the response limiter",
+ )
+});
+
pub fn scrape_discovery_metrics() {
let metrics =
discv5::metrics::Metrics::from(discv5::Discv5::::raw_metrics());
diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
index 083887046a..95a4e82fa2 100644
--- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
+++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
@@ -1,6 +1,8 @@
use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY;
use crate::discovery::{peer_id_to_node_id, CombinedKey};
-use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId};
+use crate::{
+ metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId, SyncInfo,
+};
use itertools::Itertools;
use logging::crit;
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
@@ -15,7 +17,7 @@ use std::{
use sync_status::SyncStatus;
use tracing::{debug, error, trace, warn};
use types::data_column_custody_group::compute_subnets_for_node;
-use types::{ChainSpec, DataColumnSubnetId, EthSpec};
+use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec, Hash256, Slot};
pub mod client;
pub mod peer_info;
@@ -735,6 +737,19 @@ impl PeerDB {
},
);
+ self.update_sync_status(
+ &peer_id,
+ SyncStatus::Synced {
+ // Fill in mock SyncInfo, only for the peer to return `is_synced() == true`.
+ info: SyncInfo {
+ head_slot: Slot::new(0),
+ head_root: Hash256::ZERO,
+ finalized_epoch: Epoch::new(0),
+ finalized_root: Hash256::ZERO,
+ },
+ },
+ );
+
if supernode {
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
let all_subnets = (0..spec.data_column_sidecar_subnet_count)
diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs
index b86e2b3a6f..33c5521c3b 100644
--- a/beacon_node/lighthouse_network/src/rpc/handler.rs
+++ b/beacon_node/lighthouse_network/src/rpc/handler.rs
@@ -141,7 +141,7 @@ where
/// Waker, to be sure the handler gets polled when needed.
waker: Option,
- /// Timeout that will me used for inbound and outbound responses.
+ /// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,
}
@@ -314,6 +314,7 @@ where
}
return;
};
+
// If the response we are sending is an error, report back for handling
if let RpcResponse::Error(ref code, ref reason) = response {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
@@ -331,6 +332,7 @@ where
"Response not sent. Deactivated handler");
return;
}
+
inbound_info.pending_items.push_back(response);
}
}
diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs
index b748ab11c0..e6939e36d8 100644
--- a/beacon_node/lighthouse_network/src/rpc/methods.rs
+++ b/beacon_node/lighthouse_network/src/rpc/methods.rs
@@ -606,6 +606,20 @@ pub enum ResponseTermination {
LightClientUpdatesByRange,
}
+impl ResponseTermination {
+ pub fn as_protocol(&self) -> Protocol {
+ match self {
+ ResponseTermination::BlocksByRange => Protocol::BlocksByRange,
+ ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
+ ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
+ ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot,
+ ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot,
+ ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange,
+ ResponseTermination::LightClientUpdatesByRange => Protocol::LightClientUpdatesByRange,
+ }
+ }
+}
+
/// The structured response containing a result/code indicating success or failure
/// and the contents of the response
#[derive(Debug, Clone)]
diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs
index 0f23da7f38..8cb720132a 100644
--- a/beacon_node/lighthouse_network/src/rpc/mod.rs
+++ b/beacon_node/lighthouse_network/src/rpc/mod.rs
@@ -4,7 +4,6 @@
//! direct peer-to-peer communication primarily for sending/receiving chain information for
//! syncing.
-use futures::future::FutureExt;
use handler::RPCHandler;
use libp2p::core::transport::PortUse;
use libp2p::swarm::{
@@ -13,13 +12,12 @@ use libp2p::swarm::{
};
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
-use logging::crit;
-use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
+use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
-use tracing::{debug, instrument, trace};
+use tracing::{debug, error, instrument, trace};
use types::{EthSpec, ForkContext};
pub(crate) use handler::{HandlerErr, HandlerEvent};
@@ -28,6 +26,11 @@ pub(crate) use methods::{
};
pub use protocol::RequestType;
+use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
+use self::protocol::RPCProtocol;
+use self::self_limiter::SelfRateLimiter;
+use crate::rpc::rate_limiter::RateLimiterItem;
+use crate::rpc::response_limiter::ResponseLimiter;
pub use handler::SubstreamId;
pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
@@ -35,10 +38,6 @@ pub use methods::{
};
pub use protocol::{Protocol, RPCError};
-use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
-use self::protocol::RPCProtocol;
-use self::self_limiter::SelfRateLimiter;
-
pub(crate) mod codec;
pub mod config;
mod handler;
@@ -46,8 +45,12 @@ pub mod methods;
mod outbound;
mod protocol;
mod rate_limiter;
+mod response_limiter;
mod self_limiter;
+// Maximum number of concurrent requests per protocol ID that a client may issue.
+const MAX_CONCURRENT_REQUESTS: usize = 2;
+
/// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
impl ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {}
@@ -144,10 +147,12 @@ pub struct NetworkParams {
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic.
pub struct RPC {
- /// Rate limiter
- limiter: Option,
+ /// Rate limiter for our responses.
+ response_limiter: Option>,
/// Rate limiter for our own requests.
- self_limiter: Option>,
+ outbound_request_limiter: SelfRateLimiter,
+ /// Active inbound requests that are awaiting a response.
+ active_inbound_requests: HashMap)>,
/// Queue of events to be processed.
events: Vec>,
fork_context: Arc,
@@ -173,20 +178,20 @@ impl RPC {
network_params: NetworkParams,
seq_number: u64,
) -> Self {
- let inbound_limiter = inbound_rate_limiter_config.map(|config| {
- debug!(?config, "Using inbound rate limiting params");
- RateLimiter::new_with_config(config.0, fork_context.clone())
+ let response_limiter = inbound_rate_limiter_config.map(|config| {
+ debug!(?config, "Using response rate limiting params");
+ ResponseLimiter::new(config, fork_context.clone())
.expect("Inbound limiter configuration parameters are valid")
});
- let self_limiter = outbound_rate_limiter_config.map(|config| {
- SelfRateLimiter::new(config, fork_context.clone())
- .expect("Configuration parameters are valid")
- });
+ let outbound_request_limiter: SelfRateLimiter =
+ SelfRateLimiter::new(outbound_rate_limiter_config, fork_context.clone())
+ .expect("Outbound limiter configuration parameters are valid");
RPC {
- limiter: inbound_limiter,
- self_limiter,
+ response_limiter,
+ outbound_request_limiter,
+ active_inbound_requests: HashMap::new(),
events: Vec::new(),
fork_context,
enable_light_client_server,
@@ -210,6 +215,44 @@ impl RPC {
request_id: InboundRequestId,
response: RpcResponse,
) {
+ let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id)
+ else {
+ error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent");
+ return;
+ };
+
+ // Add the request back to active requests if the response is `Success` and requires stream
+ // termination.
+ if request_type.protocol().terminator().is_some()
+ && matches!(response, RpcResponse::Success(_))
+ {
+ self.active_inbound_requests
+ .insert(request_id, (peer_id, request_type.clone()));
+ }
+
+ self.send_response_inner(peer_id, request_type.protocol(), request_id, response);
+ }
+
+ fn send_response_inner(
+ &mut self,
+ peer_id: PeerId,
+ protocol: Protocol,
+ request_id: InboundRequestId,
+ response: RpcResponse,
+ ) {
+ if let Some(response_limiter) = self.response_limiter.as_mut() {
+ if !response_limiter.allows(
+ peer_id,
+ protocol,
+ request_id.connection_id,
+ request_id.substream_id,
+ response.clone(),
+ ) {
+ // Response is logged and queued internally in the response limiter.
+ return;
+ }
+ }
+
self.events.push(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(request_id.connection_id),
@@ -227,23 +270,19 @@ impl RPC {
skip_all
)]
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) {
- let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
- match self_limiter.allows(peer_id, request_id, req) {
- Ok(event) => event,
- Err(_e) => {
- // Request is logged and queued internally in the self rate limiter.
- return;
- }
+ match self
+ .outbound_request_limiter
+ .allows(peer_id, request_id, req)
+ {
+ Ok(event) => self.events.push(BehaviourAction::NotifyHandler {
+ peer_id,
+ handler: NotifyHandler::Any,
+ event,
+ }),
+ Err(_e) => {
+ // Request is logged and queued internally in the self rate limiter.
}
- } else {
- RPCSend::Request(request_id, req)
- };
-
- self.events.push(BehaviourAction::NotifyHandler {
- peer_id,
- handler: NotifyHandler::Any,
- event,
- });
+ }
}
/// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This
@@ -373,20 +412,27 @@ where
if remaining_established > 0 {
return;
}
+
// Get a list of pending requests from the self rate limiter
- if let Some(limiter) = self.self_limiter.as_mut() {
- for (id, proto) in limiter.peer_disconnected(peer_id) {
- let error_msg = ToSwarm::GenerateEvent(RPCMessage {
- peer_id,
- connection_id,
- message: Err(HandlerErr::Outbound {
- id,
- proto,
- error: RPCError::Disconnected,
- }),
- });
- self.events.push(error_msg);
- }
+ for (id, proto) in self.outbound_request_limiter.peer_disconnected(peer_id) {
+ let error_msg = ToSwarm::GenerateEvent(RPCMessage {
+ peer_id,
+ connection_id,
+ message: Err(HandlerErr::Outbound {
+ id,
+ proto,
+ error: RPCError::Disconnected,
+ }),
+ });
+ self.events.push(error_msg);
+ }
+
+ self.active_inbound_requests.retain(
+ |_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id,
+ );
+
+ if let Some(limiter) = self.response_limiter.as_mut() {
+ limiter.peer_disconnected(peer_id);
}
// Replace the pending Requests to the disconnected peer
@@ -420,57 +466,39 @@ where
) {
match event {
HandlerEvent::Ok(RPCReceived::Request(request_id, request_type)) => {
- if let Some(limiter) = self.limiter.as_mut() {
- // check if the request is conformant to the quota
- match limiter.allows(&peer_id, &request_type) {
- Err(RateLimitedErr::TooLarge) => {
- // we set the batch sizes, so this is a coding/config err for most protocols
- let protocol = request_type.versioned_protocol().protocol();
- if matches!(
- protocol,
- Protocol::BlocksByRange
- | Protocol::BlobsByRange
- | Protocol::DataColumnsByRange
- | Protocol::BlocksByRoot
- | Protocol::BlobsByRoot
- | Protocol::DataColumnsByRoot
- ) {
- debug!(request = %request_type, %protocol, "Request too large to process");
- } else {
- // Other protocols shouldn't be sending large messages, we should flag the peer kind
- crit!(%protocol, "Request size too large to ever be processed");
- }
- // send an error code to the peer.
- // the handler upon receiving the error code will send it back to the behaviour
- self.send_response(
- peer_id,
- request_id,
- RpcResponse::Error(
- RpcErrorResponse::RateLimited,
- "Rate limited. Request too large".into(),
- ),
- );
- return;
- }
- Err(RateLimitedErr::TooSoon(wait_time)) => {
- debug!(request = %request_type, %peer_id, wait_time_ms = wait_time.as_millis(), "Request exceeds the rate limit");
- // send an error code to the peer.
- // the handler upon receiving the error code will send it back to the behaviour
- self.send_response(
- peer_id,
- request_id,
- RpcResponse::Error(
- RpcErrorResponse::RateLimited,
- format!("Wait {:?}", wait_time).into(),
- ),
- );
- return;
- }
- // No rate limiting, continue.
- Ok(()) => {}
- }
+ let is_concurrent_request_limit_exceeded = self
+ .active_inbound_requests
+ .iter()
+ .filter(
+ |(_inbound_request_id, (request_peer_id, active_request_type))| {
+ *request_peer_id == peer_id
+ && active_request_type.protocol() == request_type.protocol()
+ },
+ )
+ .count()
+ >= MAX_CONCURRENT_REQUESTS;
+
+ // Restricts more than MAX_CONCURRENT_REQUESTS inbound requests from running simultaneously on the same protocol per peer.
+ if is_concurrent_request_limit_exceeded {
+ // There is already an active request with the same protocol. Send an error code to the peer.
+ debug!(request = %request_type, protocol = %request_type.protocol(), %peer_id, "There is an active request with the same protocol");
+ self.send_response_inner(
+ peer_id,
+ request_type.protocol(),
+ request_id,
+ RpcResponse::Error(
+ RpcErrorResponse::RateLimited,
+ format!("Rate limited. There are already {MAX_CONCURRENT_REQUESTS} active requests with the same protocol")
+ .into(),
+ ),
+ );
+ return;
}
+ // Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests.
+ self.active_inbound_requests
+ .insert(request_id, (peer_id, request_type.clone()));
+
// If we received a Ping, we queue a Pong response.
if let RequestType::Ping(_) = request_type {
trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong");
@@ -489,14 +517,38 @@ where
message: Ok(RPCReceived::Request(request_id, request_type)),
}));
}
- HandlerEvent::Ok(rpc) => {
+ HandlerEvent::Ok(RPCReceived::Response(id, response)) => {
+ if response.protocol().terminator().is_none() {
+ // Inform the limiter that a response has been received.
+ self.outbound_request_limiter
+ .request_completed(&peer_id, response.protocol());
+ }
+
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
- message: Ok(rpc),
+ message: Ok(RPCReceived::Response(id, response)),
+ }));
+ }
+ HandlerEvent::Ok(RPCReceived::EndOfStream(id, response_termination)) => {
+ // Inform the limiter that a response has been received.
+ self.outbound_request_limiter
+ .request_completed(&peer_id, response_termination.as_protocol());
+
+ self.events.push(ToSwarm::GenerateEvent(RPCMessage {
+ peer_id,
+ connection_id,
+ message: Ok(RPCReceived::EndOfStream(id, response_termination)),
}));
}
HandlerEvent::Err(err) => {
+ // Inform the limiter that the request has ended with an error.
+ let protocol = match err {
+ HandlerErr::Inbound { proto, .. } | HandlerErr::Outbound { proto, .. } => proto,
+ };
+ self.outbound_request_limiter
+ .request_completed(&peer_id, protocol);
+
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
@@ -514,15 +566,20 @@ where
}
fn poll(&mut self, cx: &mut Context) -> Poll>> {
- // let the rate limiter prune.
- if let Some(limiter) = self.limiter.as_mut() {
- let _ = limiter.poll_unpin(cx);
+ if let Some(response_limiter) = self.response_limiter.as_mut() {
+ if let Poll::Ready(responses) = response_limiter.poll_ready(cx) {
+ for response in responses {
+ self.events.push(ToSwarm::NotifyHandler {
+ peer_id: response.peer_id,
+ handler: NotifyHandler::One(response.connection_id),
+ event: RPCSend::Response(response.substream_id, response.response),
+ });
+ }
+ }
}
- if let Some(self_limiter) = self.self_limiter.as_mut() {
- if let Poll::Ready(event) = self_limiter.poll_ready(cx) {
- self.events.push(event)
- }
+ if let Poll::Ready(event) = self.outbound_request_limiter.poll_ready(cx) {
+ self.events.push(event)
}
if !self.events.is_empty() {
diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
index b9e82a5f1e..f666c30d52 100644
--- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
+++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
@@ -149,7 +149,7 @@ pub struct RPCRateLimiterBuilder {
lcbootstrap_quota: Option,
/// Quota for the LightClientOptimisticUpdate protocol.
lc_optimistic_update_quota: Option,
- /// Quota for the LightClientOptimisticUpdate protocol.
+ /// Quota for the LightClientFinalityUpdate protocol.
lc_finality_update_quota: Option,
/// Quota for the LightClientUpdatesByRange protocol.
lc_updates_by_range_quota: Option,
@@ -275,6 +275,17 @@ impl RateLimiterItem for super::RequestType {
}
}
+impl RateLimiterItem for (super::RpcResponse, Protocol) {
+ fn protocol(&self) -> Protocol {
+ self.1
+ }
+
+ fn max_responses(&self, _current_fork: ForkName, _spec: &ChainSpec) -> u64 {
+ // A response chunk consumes one token of the rate limiter.
+ 1
+ }
+}
+
impl RPCRateLimiter {
pub fn new_with_config(
config: RateLimiterConfig,
diff --git a/beacon_node/lighthouse_network/src/rpc/response_limiter.rs b/beacon_node/lighthouse_network/src/rpc/response_limiter.rs
new file mode 100644
index 0000000000..c583baaadd
--- /dev/null
+++ b/beacon_node/lighthouse_network/src/rpc/response_limiter.rs
@@ -0,0 +1,177 @@
+use crate::rpc::config::InboundRateLimiterConfig;
+use crate::rpc::rate_limiter::{RPCRateLimiter, RateLimitedErr};
+use crate::rpc::self_limiter::timestamp_now;
+use crate::rpc::{Protocol, RpcResponse, SubstreamId};
+use crate::PeerId;
+use futures::FutureExt;
+use libp2p::swarm::ConnectionId;
+use logging::crit;
+use std::collections::hash_map::Entry;
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio_util::time::DelayQueue;
+use tracing::debug;
+use types::{EthSpec, ForkContext};
+
+/// A response that was rate limited or waiting on rate limited responses for the same peer and
+/// protocol.
+#[derive(Clone)]
+pub(super) struct QueuedResponse {
+ pub peer_id: PeerId,
+ pub connection_id: ConnectionId,
+ pub substream_id: SubstreamId,
+ pub response: RpcResponse,
+ pub protocol: Protocol,
+ pub queued_at: Duration,
+}
+
+pub(super) struct ResponseLimiter {
+ /// Rate limiter for our responses.
+ limiter: RPCRateLimiter,
+ /// Responses queued for sending. These responses are stored when the response limiter rejects them.
+ delayed_responses: HashMap<(PeerId, Protocol), VecDeque>>,
+ /// The delay required to allow a peer's outbound response per protocol.
+ next_response: DelayQueue<(PeerId, Protocol)>,
+}
+
+impl ResponseLimiter {
+ /// Creates a new [`ResponseLimiter`] based on configuration values.
+ pub fn new(
+ config: InboundRateLimiterConfig,
+ fork_context: Arc,
+ ) -> Result {
+ Ok(ResponseLimiter {
+ limiter: RPCRateLimiter::new_with_config(config.0, fork_context)?,
+ delayed_responses: HashMap::new(),
+ next_response: DelayQueue::new(),
+ })
+ }
+
+ /// Checks if the rate limiter allows the response. When not allowed, the response is delayed
+ /// until it can be sent.
+ pub fn allows(
+ &mut self,
+ peer_id: PeerId,
+ protocol: Protocol,
+ connection_id: ConnectionId,
+ substream_id: SubstreamId,
+ response: RpcResponse,
+ ) -> bool {
+ // First check that there are not already other responses waiting to be sent.
+ if let Some(queue) = self.delayed_responses.get_mut(&(peer_id, protocol)) {
+ debug!(%peer_id, %protocol, "Response rate limiting since there are already other responses waiting to be sent");
+ queue.push_back(QueuedResponse {
+ peer_id,
+ connection_id,
+ substream_id,
+ response,
+ protocol,
+ queued_at: timestamp_now(),
+ });
+ return false;
+ }
+
+ if let Err(wait_time) =
+ Self::try_limiter(&mut self.limiter, peer_id, response.clone(), protocol)
+ {
+ self.delayed_responses
+ .entry((peer_id, protocol))
+ .or_default()
+ .push_back(QueuedResponse {
+ peer_id,
+ connection_id,
+ substream_id,
+ response,
+ protocol,
+ queued_at: timestamp_now(),
+ });
+ self.next_response.insert((peer_id, protocol), wait_time);
+ return false;
+ }
+
+ true
+ }
+
+ /// Checks if the limiter allows the response. If the response should be delayed, the duration
+ /// to wait is returned.
+ fn try_limiter(
+ limiter: &mut RPCRateLimiter,
+ peer_id: PeerId,
+ response: RpcResponse,
+ protocol: Protocol,
+ ) -> Result<(), Duration> {
+ match limiter.allows(&peer_id, &(response.clone(), protocol)) {
+ Ok(()) => Ok(()),
+ Err(e) => match e {
+ RateLimitedErr::TooLarge => {
+ // This should never happen with default parameters. Let's just send the response.
+ // Log a crit since this is a config issue.
+ crit!(
+ %protocol,
+ "Response rate limiting error for a batch that will never fit. Sending response anyway. Check configuration parameters."
+ );
+ Ok(())
+ }
+ RateLimitedErr::TooSoon(wait_time) => {
+ debug!(%peer_id, %protocol, wait_time_ms = wait_time.as_millis(), "Response rate limiting");
+ Err(wait_time)
+ }
+ },
+ }
+ }
+
+ /// Informs the limiter that a peer has disconnected. This removes any pending responses.
+ pub fn peer_disconnected(&mut self, peer_id: PeerId) {
+ self.delayed_responses
+ .retain(|(map_peer_id, _protocol), _queue| map_peer_id != &peer_id);
+ }
+
+ /// When a peer and protocol are allowed to send a next response, this function checks the
+ /// queued responses and attempts marking as ready as many as the limiter allows.
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>> {
+ let mut responses = vec![];
+ while let Poll::Ready(Some(expired)) = self.next_response.poll_expired(cx) {
+ let (peer_id, protocol) = expired.into_inner();
+
+ if let Entry::Occupied(mut entry) = self.delayed_responses.entry((peer_id, protocol)) {
+ let queue = entry.get_mut();
+ // Take delayed responses from the queue, as long as the limiter allows it.
+ while let Some(response) = queue.pop_front() {
+ match Self::try_limiter(
+ &mut self.limiter,
+ response.peer_id,
+ response.response.clone(),
+ response.protocol,
+ ) {
+ Ok(()) => {
+ metrics::observe_duration(
+ &crate::metrics::RESPONSE_IDLING,
+ timestamp_now().saturating_sub(response.queued_at),
+ );
+ responses.push(response)
+ }
+ Err(wait_time) => {
+ // The response was taken from the queue, but the limiter didn't allow it.
+ queue.push_front(response);
+ self.next_response.insert((peer_id, protocol), wait_time);
+ break;
+ }
+ }
+ }
+ if queue.is_empty() {
+ entry.remove();
+ }
+ }
+ }
+
+ // Prune the rate limiter.
+ let _ = self.limiter.poll_unpin(cx);
+
+ if !responses.is_empty() {
+ return Poll::Ready(responses);
+ }
+ Poll::Pending
+ }
+}
diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs
index e4af977a6c..e5b685676f 100644
--- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs
+++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs
@@ -1,3 +1,10 @@
+use super::{
+ config::OutboundRateLimiterConfig,
+ rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
+ BehaviourAction, Protocol, RPCSend, ReqId, RequestType, MAX_CONCURRENT_REQUESTS,
+};
+use crate::rpc::rate_limiter::RateLimiterItem;
+use std::time::{SystemTime, UNIX_EPOCH};
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
sync::Arc,
@@ -13,30 +20,31 @@ use tokio_util::time::DelayQueue;
use tracing::debug;
use types::{EthSpec, ForkContext};
-use super::{
- config::OutboundRateLimiterConfig,
- rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
- BehaviourAction, Protocol, RPCSend, ReqId, RequestType,
-};
-
/// A request that was rate limited or waiting on rate limited requests for the same peer and
/// protocol.
struct QueuedRequest {
req: RequestType,
request_id: Id,
+ queued_at: Duration,
}
+/// The number of milliseconds requests delayed due to the concurrent request limit stay in the queue.
+const WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS: u64 = 100;
+
+#[allow(clippy::type_complexity)]
pub(crate) struct SelfRateLimiter {
- /// Requests queued for sending per peer. This requests are stored when the self rate
+ /// Active requests that are awaiting a response.
+ active_requests: HashMap>,
+ /// Requests queued for sending per peer. These requests are stored when the self rate
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
/// are stored in the same way.
delayed_requests: HashMap<(PeerId, Protocol), VecDeque>>,
/// The delay required to allow a peer's outbound request per protocol.
next_peer_request: DelayQueue<(PeerId, Protocol)>,
/// Rate limiter for our own requests.
- limiter: RateLimiter,
+ rate_limiter: Option,
/// Requests that are ready to be sent.
- ready_requests: SmallVec<[(PeerId, RPCSend); 3]>,
+ ready_requests: SmallVec<[(PeerId, RPCSend, Duration); 3]>,
}
/// Error returned when the rate limiter does not accept a request.
@@ -49,18 +57,23 @@ pub enum Error {
}
impl SelfRateLimiter {
- /// Creates a new [`SelfRateLimiter`] based on configration values.
+ /// Creates a new [`SelfRateLimiter`] based on configuration values.
pub fn new(
- config: OutboundRateLimiterConfig,
+ config: Option,
fork_context: Arc,
) -> Result {
debug!(?config, "Using self rate limiting params");
- let limiter = RateLimiter::new_with_config(config.0, fork_context)?;
+ let rate_limiter = if let Some(c) = config {
+ Some(RateLimiter::new_with_config(c.0, fork_context)?)
+ } else {
+ None
+ };
Ok(SelfRateLimiter {
+ active_requests: Default::default(),
delayed_requests: Default::default(),
next_peer_request: Default::default(),
- limiter,
+ rate_limiter,
ready_requests: Default::default(),
})
}
@@ -77,11 +90,21 @@ impl SelfRateLimiter {
let protocol = req.versioned_protocol().protocol();
// First check that there are not already other requests waiting to be sent.
if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) {
- queued_requests.push_back(QueuedRequest { req, request_id });
-
+ debug!(%peer_id, protocol = %req.protocol(), "Self rate limiting since there are already other requests waiting to be sent");
+ queued_requests.push_back(QueuedRequest {
+ req,
+ request_id,
+ queued_at: timestamp_now(),
+ });
return Err(Error::PendingRequests);
}
- match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) {
+ match Self::try_send_request(
+ &mut self.active_requests,
+ &mut self.rate_limiter,
+ peer_id,
+ request_id,
+ req,
+ ) {
Err((rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
@@ -99,33 +122,71 @@ impl SelfRateLimiter {
/// Auxiliary function to deal with self rate limiting outcomes. If the rate limiter allows the
/// request, the [`ToSwarm`] that should be emitted is returned. If the request
/// should be delayed, it's returned with the duration to wait.
+ #[allow(clippy::result_large_err)]
fn try_send_request(
- limiter: &mut RateLimiter,
+ active_requests: &mut HashMap>,
+ rate_limiter: &mut Option,
peer_id: PeerId,
request_id: Id,
req: RequestType,
) -> Result, (QueuedRequest, Duration)> {
- match limiter.allows(&peer_id, &req) {
- Ok(()) => Ok(RPCSend::Request(request_id, req)),
- Err(e) => {
- let protocol = req.versioned_protocol();
- match e {
- RateLimitedErr::TooLarge => {
- // this should never happen with default parameters. Let's just send the request.
- // Log a crit since this is a config issue.
- crit!(
- protocol = %req.versioned_protocol().protocol(),
- "Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters."
- );
- Ok(RPCSend::Request(request_id, req))
- }
- RateLimitedErr::TooSoon(wait_time) => {
- debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting");
- Err((QueuedRequest { req, request_id }, wait_time))
+ if let Some(active_request) = active_requests.get(&peer_id) {
+ if let Some(count) = active_request.get(&req.protocol()) {
+ if *count >= MAX_CONCURRENT_REQUESTS {
+ debug!(
+ %peer_id,
+ protocol = %req.protocol(),
+ "Self rate limiting due to the number of concurrent requests"
+ );
+ return Err((
+ QueuedRequest {
+ req,
+ request_id,
+ queued_at: timestamp_now(),
+ },
+ Duration::from_millis(WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS),
+ ));
+ }
+ }
+ }
+
+ if let Some(limiter) = rate_limiter.as_mut() {
+ match limiter.allows(&peer_id, &req) {
+ Ok(()) => {}
+ Err(e) => {
+ let protocol = req.versioned_protocol();
+ match e {
+ RateLimitedErr::TooLarge => {
+ // this should never happen with default parameters. Let's just send the request.
+ // Log a crit since this is a config issue.
+ crit!(
+ protocol = %req.versioned_protocol().protocol(),
+ "Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters.",
+ );
+ }
+ RateLimitedErr::TooSoon(wait_time) => {
+ debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting");
+ return Err((
+ QueuedRequest {
+ req,
+ request_id,
+ queued_at: timestamp_now(),
+ },
+ wait_time,
+ ));
+ }
}
}
}
}
+
+ *active_requests
+ .entry(peer_id)
+ .or_default()
+ .entry(req.protocol())
+ .or_default() += 1;
+
+ Ok(RPCSend::Request(request_id, req))
}
/// When a peer and protocol are allowed to send a next request, this function checks the
@@ -133,16 +194,32 @@ impl SelfRateLimiter {
fn next_peer_request_ready(&mut self, peer_id: PeerId, protocol: Protocol) {
if let Entry::Occupied(mut entry) = self.delayed_requests.entry((peer_id, protocol)) {
let queued_requests = entry.get_mut();
- while let Some(QueuedRequest { req, request_id }) = queued_requests.pop_front() {
- match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) {
- Err((rate_limited_req, wait_time)) => {
+ while let Some(QueuedRequest {
+ req,
+ request_id,
+ queued_at,
+ }) = queued_requests.pop_front()
+ {
+ match Self::try_send_request(
+ &mut self.active_requests,
+ &mut self.rate_limiter,
+ peer_id,
+ request_id,
+ req.clone(),
+ ) {
+ Err((_rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
- queued_requests.push_front(rate_limited_req);
+ // Don't push `rate_limited_req` here to prevent `queued_at` from being updated.
+ queued_requests.push_front(QueuedRequest {
+ req,
+ request_id,
+ queued_at,
+ });
// If one fails just wait for the next window that allows sending requests.
return;
}
- Ok(event) => self.ready_requests.push((peer_id, event)),
+ Ok(event) => self.ready_requests.push((peer_id, event, queued_at)),
}
}
if queued_requests.is_empty() {
@@ -156,6 +233,8 @@ impl SelfRateLimiter {
/// Informs the limiter that a peer has disconnected. This removes any pending requests and
/// returns their IDs.
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
+ self.active_requests.remove(&peer_id);
+
// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
// should never really be large. So we iterate for simplicity
let mut failed_requests = Vec::new();
@@ -177,19 +256,39 @@ impl SelfRateLimiter {
failed_requests
}
+ /// Informs the limiter that a response has been received.
+ pub fn request_completed(&mut self, peer_id: &PeerId, protocol: Protocol) {
+ if let Some(active_requests) = self.active_requests.get_mut(peer_id) {
+ if let Entry::Occupied(mut entry) = active_requests.entry(protocol) {
+ if *entry.get() > 1 {
+ *entry.get_mut() -= 1;
+ } else {
+ entry.remove();
+ }
+ }
+ }
+ }
+
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> {
// First check the requests that were self rate limited, since those might add events to
- // the queue. Also do this this before rate limiter prunning to avoid removing and
+ // the queue. Also do this before rate limiter pruning to avoid removing and
// immediately adding rate limiting keys.
if let Poll::Ready(Some(expired)) = self.next_peer_request.poll_expired(cx) {
let (peer_id, protocol) = expired.into_inner();
self.next_peer_request_ready(peer_id, protocol);
}
+
// Prune the rate limiter.
- let _ = self.limiter.poll_unpin(cx);
+ if let Some(limiter) = self.rate_limiter.as_mut() {
+ let _ = limiter.poll_unpin(cx);
+ }
// Finally return any queued events.
- if let Some((peer_id, event)) = self.ready_requests.pop() {
+ if let Some((peer_id, event, queued_at)) = self.ready_requests.pop() {
+ metrics::observe_duration(
+ &crate::metrics::OUTBOUND_REQUEST_IDLING,
+ timestamp_now().saturating_sub(queued_at),
+ );
return Poll::Ready(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
@@ -201,12 +300,19 @@ impl SelfRateLimiter {
}
}
+/// Returns the duration since the unix epoch.
+pub fn timestamp_now() -> Duration {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_else(|_| Duration::from_secs(0))
+}
+
#[cfg(test)]
mod tests {
use crate::rpc::config::{OutboundRateLimiterConfig, RateLimiterConfig};
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
- use crate::rpc::{Ping, Protocol, RequestType};
+ use crate::rpc::{Ping, Protocol, RPCSend, RequestType};
use crate::service::api_types::{AppRequestId, SingleLookupReqId, SyncRequestId};
use libp2p::PeerId;
use logging::create_test_tracing_subscriber;
@@ -227,7 +333,7 @@ mod tests {
&MainnetEthSpec::default_spec(),
));
let mut limiter: SelfRateLimiter =
- SelfRateLimiter::new(config, fork_context).unwrap();
+ SelfRateLimiter::new(Some(config), fork_context).unwrap();
let peer_id = PeerId::random();
let lookup_id = 0;
@@ -290,4 +396,149 @@ mod tests {
assert_eq!(limiter.ready_requests.len(), 1);
}
}
+
+ /// Test that `next_peer_request_ready` correctly maintains the queue when using the self-limiter without rate limiting.
+ #[tokio::test]
+ async fn test_next_peer_request_ready_concurrent_requests() {
+ let fork_context = std::sync::Arc::new(ForkContext::new::(
+ Slot::new(0),
+ Hash256::ZERO,
+ &MainnetEthSpec::default_spec(),
+ ));
+ let mut limiter: SelfRateLimiter =
+ SelfRateLimiter::new(None, fork_context).unwrap();
+ let peer_id = PeerId::random();
+
+ for i in 1..=5u32 {
+ let result = limiter.allows(
+ peer_id,
+ AppRequestId::Sync(SyncRequestId::SingleBlock {
+ id: SingleLookupReqId {
+ lookup_id: i,
+ req_id: i,
+ },
+ }),
+ RequestType::Ping(Ping { data: i as u64 }),
+ );
+
+ // Check that the limiter allows the first two requests.
+ if i <= 2 {
+ assert!(result.is_ok());
+ } else {
+ assert!(result.is_err());
+ }
+ }
+
+ let queue = limiter
+ .delayed_requests
+ .get(&(peer_id, Protocol::Ping))
+ .unwrap();
+ assert_eq!(3, queue.len());
+
+ // The delayed requests remain even after the next_peer_request_ready call because the responses have not been received.
+ limiter.next_peer_request_ready(peer_id, Protocol::Ping);
+ let queue = limiter
+ .delayed_requests
+ .get(&(peer_id, Protocol::Ping))
+ .unwrap();
+ assert_eq!(3, queue.len());
+
+ limiter.request_completed(&peer_id, Protocol::Ping);
+ limiter.next_peer_request_ready(peer_id, Protocol::Ping);
+
+ let queue = limiter
+ .delayed_requests
+ .get(&(peer_id, Protocol::Ping))
+ .unwrap();
+ assert_eq!(2, queue.len());
+
+ limiter.request_completed(&peer_id, Protocol::Ping);
+ limiter.request_completed(&peer_id, Protocol::Ping);
+ limiter.next_peer_request_ready(peer_id, Protocol::Ping);
+
+ let queue = limiter.delayed_requests.get(&(peer_id, Protocol::Ping));
+ assert!(queue.is_none());
+
+ // Check that the three delayed requests have moved to ready_requests.
+ let mut it = limiter.ready_requests.iter();
+ for i in 3..=5u32 {
+ let (_peer_id, RPCSend::Request(request_id, _), _) = it.next().unwrap() else {
+ unreachable!()
+ };
+
+ assert!(matches!(
+ request_id,
+ AppRequestId::Sync(SyncRequestId::SingleBlock {
+ id: SingleLookupReqId { req_id, .. },
+ }) if *req_id == i
+ ));
+ }
+ }
+
+ #[tokio::test]
+ async fn test_peer_disconnected() {
+ let fork_context = std::sync::Arc::new(ForkContext::new::(
+ Slot::new(0),
+ Hash256::ZERO,
+ &MainnetEthSpec::default_spec(),
+ ));
+ let mut limiter: SelfRateLimiter =
+ SelfRateLimiter::new(None, fork_context).unwrap();
+ let peer1 = PeerId::random();
+ let peer2 = PeerId::random();
+
+ for peer in [peer1, peer2] {
+ for i in 1..=5u32 {
+ let result = limiter.allows(
+ peer,
+ AppRequestId::Sync(SyncRequestId::SingleBlock {
+ id: SingleLookupReqId {
+ lookup_id: i,
+ req_id: i,
+ },
+ }),
+ RequestType::Ping(Ping { data: i as u64 }),
+ );
+
+ // Check that the limiter allows the first two requests.
+ if i <= 2 {
+ assert!(result.is_ok());
+ } else {
+ assert!(result.is_err());
+ }
+ }
+ }
+
+ assert!(limiter.active_requests.contains_key(&peer1));
+ assert!(limiter
+ .delayed_requests
+ .contains_key(&(peer1, Protocol::Ping)));
+ assert!(limiter.active_requests.contains_key(&peer2));
+ assert!(limiter
+ .delayed_requests
+ .contains_key(&(peer2, Protocol::Ping)));
+
+ // Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly.
+ let mut failed_requests = limiter.peer_disconnected(peer1);
+ for i in 3..=5u32 {
+ let (request_id, _) = failed_requests.remove(0);
+ assert!(matches!(
+ request_id,
+ AppRequestId::Sync(SyncRequestId::SingleBlock {
+ id: SingleLookupReqId { req_id, .. },
+ }) if req_id == i
+ ));
+ }
+
+ // Check that peer1’s active and delayed requests have been removed.
+ assert!(!limiter.active_requests.contains_key(&peer1));
+ assert!(!limiter
+ .delayed_requests
+ .contains_key(&(peer1, Protocol::Ping)));
+
+ assert!(limiter.active_requests.contains_key(&peer2));
+ assert!(limiter
+ .delayed_requests
+ .contains_key(&(peer2, Protocol::Ping)));
+ }
}
diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs
index 3031a0dff7..fd99d93589 100644
--- a/beacon_node/lighthouse_network/src/types/globals.rs
+++ b/beacon_node/lighthouse_network/src/types/globals.rs
@@ -206,6 +206,20 @@ impl NetworkGlobals {
.collect::>()
}
+ /// Returns true if the peer is known and is a custodian of `column_index`
+ pub fn is_custody_peer_of(&self, column_index: ColumnIndex, peer_id: &PeerId) -> bool {
+ self.peers
+ .read()
+ .peer_info(peer_id)
+ .map(|info| {
+ info.is_assigned_to_custody_subnet(&DataColumnSubnetId::from_column_index(
+ column_index,
+ &self.spec,
+ ))
+ })
+ .unwrap_or(false)
+ }
+
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
pub fn as_topic_config(&self) -> TopicConfig {
TopicConfig {
diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs
index d686885ff7..d979ef9265 100644
--- a/beacon_node/lighthouse_network/tests/common.rs
+++ b/beacon_node/lighthouse_network/tests/common.rs
@@ -16,6 +16,7 @@ use types::{
type E = MinimalEthSpec;
+use lighthouse_network::rpc::config::InboundRateLimiterConfig;
use tempfile::Builder as TempBuilder;
/// Returns a dummy fork context
@@ -77,7 +78,11 @@ pub fn build_tracing_subscriber(level: &str, enabled: bool) {
}
}
-pub fn build_config(mut boot_nodes: Vec) -> Arc {
+pub fn build_config(
+ mut boot_nodes: Vec,
+ disable_peer_scoring: bool,
+ inbound_rate_limiter: Option,
+) -> Arc {
let mut config = NetworkConfig::default();
// Find unused ports by using the 0 port.
@@ -93,6 +98,8 @@ pub fn build_config(mut boot_nodes: Vec) -> Arc {
config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None);
config.boot_nodes_enr.append(&mut boot_nodes);
config.network_dir = path.into_path();
+ config.disable_peer_scoring = disable_peer_scoring;
+ config.inbound_rate_limiter_config = inbound_rate_limiter;
Arc::new(config)
}
@@ -102,8 +109,10 @@ pub async fn build_libp2p_instance(
fork_name: ForkName,
chain_spec: Arc,
service_name: String,
+ disable_peer_scoring: bool,
+ inbound_rate_limiter: Option,
) -> Libp2pInstance {
- let config = build_config(boot_nodes);
+ let config = build_config(boot_nodes, disable_peer_scoring, inbound_rate_limiter);
// launch libp2p service
let (signal, exit) = async_channel::bounded(1);
@@ -144,6 +153,8 @@ pub async fn build_node_pair(
fork_name: ForkName,
spec: Arc,
protocol: Protocol,
+ disable_peer_scoring: bool,
+ inbound_rate_limiter: Option,
) -> (Libp2pInstance, Libp2pInstance) {
let mut sender = build_libp2p_instance(
rt.clone(),
@@ -151,10 +162,20 @@ pub async fn build_node_pair(
fork_name,
spec.clone(),
"sender".to_string(),
+ disable_peer_scoring,
+ inbound_rate_limiter.clone(),
+ )
+ .await;
+ let mut receiver = build_libp2p_instance(
+ rt,
+ vec![],
+ fork_name,
+ spec.clone(),
+ "receiver".to_string(),
+ disable_peer_scoring,
+ inbound_rate_limiter,
)
.await;
- let mut receiver =
- build_libp2p_instance(rt, vec![], fork_name, spec.clone(), "receiver".to_string()).await;
// let the two nodes set up listeners
let sender_fut = async {
@@ -235,6 +256,8 @@ pub async fn build_linear(
fork_name,
spec.clone(),
"linear".to_string(),
+ false,
+ None,
)
.await,
);
diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs
index 7a0eb4602b..9b43e8b581 100644
--- a/beacon_node/lighthouse_network/tests/rpc_tests.rs
+++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs
@@ -9,10 +9,10 @@ use lighthouse_network::{NetworkEvent, ReportSource, Response};
use ssz::Encode;
use ssz_types::VariableList;
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, Instant};
use tokio::runtime::Runtime;
use tokio::time::sleep;
-use tracing::{debug, warn};
+use tracing::{debug, error, warn};
use types::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec,
EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MinimalEthSpec,
@@ -64,8 +64,15 @@ fn test_tcp_status_rpc() {
rt.block_on(async {
// get sender/receiver
- let (mut sender, mut receiver) =
- common::build_node_pair(Arc::downgrade(&rt), ForkName::Base, spec, Protocol::Tcp).await;
+ let (mut sender, mut receiver) = common::build_node_pair(
+ Arc::downgrade(&rt),
+ ForkName::Base,
+ spec,
+ Protocol::Tcp,
+ false,
+ None,
+ )
+ .await;
// Dummy STATUS RPC message
let rpc_request = RequestType::Status(StatusMessage {
@@ -168,6 +175,8 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
ForkName::Bellatrix,
spec.clone(),
Protocol::Tcp,
+ false,
+ None,
)
.await;
@@ -311,6 +320,8 @@ fn test_blobs_by_range_chunked_rpc() {
ForkName::Deneb,
spec.clone(),
Protocol::Tcp,
+ false,
+ None,
)
.await;
@@ -430,6 +441,8 @@ fn test_tcp_blocks_by_range_over_limit() {
ForkName::Bellatrix,
spec.clone(),
Protocol::Tcp,
+ false,
+ None,
)
.await;
@@ -533,6 +546,8 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
ForkName::Base,
spec.clone(),
Protocol::Tcp,
+ false,
+ None,
)
.await;
@@ -665,6 +680,8 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
ForkName::Base,
spec.clone(),
Protocol::Tcp,
+ false,
+ None,
)
.await;
@@ -785,6 +802,8 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
ForkName::Bellatrix,
spec.clone(),
Protocol::Tcp,
+ false,
+ None,
)
.await;
@@ -929,6 +948,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
ForkName::Base,
spec.clone(),
Protocol::Tcp,
+ false,
+ None,
)
.await;
@@ -1065,8 +1086,15 @@ fn goodbye_test(log_level: &str, enable_logging: bool, protocol: Protocol) {
// get sender/receiver
rt.block_on(async {
- let (mut sender, mut receiver) =
- common::build_node_pair(Arc::downgrade(&rt), ForkName::Base, spec, protocol).await;
+ let (mut sender, mut receiver) = common::build_node_pair(
+ Arc::downgrade(&rt),
+ ForkName::Base,
+ spec,
+ protocol,
+ false,
+ None,
+ )
+ .await;
// build the sender future
let sender_future = async {
@@ -1127,3 +1155,239 @@ fn quic_test_goodbye_rpc() {
let enabled_logging = false;
goodbye_test(log_level, enabled_logging, Protocol::Quic);
}
+
+// Test that the receiver delays the responses during response rate-limiting.
+#[test]
+fn test_delayed_rpc_response() {
+ let rt = Arc::new(Runtime::new().unwrap());
+ let spec = Arc::new(E::default_spec());
+
+ // Allow 1 token to be use used every 3 seconds.
+ const QUOTA_SEC: u64 = 3;
+
+ rt.block_on(async {
+ // get sender/receiver
+ let (mut sender, mut receiver) = common::build_node_pair(
+ Arc::downgrade(&rt),
+ ForkName::Base,
+ spec,
+ Protocol::Tcp,
+ false,
+ // Configure a quota for STATUS responses of 1 token every 3 seconds.
+ Some(format!("status:1/{QUOTA_SEC}").parse().unwrap()),
+ )
+ .await;
+
+ // Dummy STATUS RPC message
+ let rpc_request = RequestType::Status(StatusMessage {
+ fork_digest: [0; 4],
+ finalized_root: Hash256::from_low_u64_be(0),
+ finalized_epoch: Epoch::new(1),
+ head_root: Hash256::from_low_u64_be(0),
+ head_slot: Slot::new(1),
+ });
+
+ // Dummy STATUS RPC message
+ let rpc_response = Response::Status(StatusMessage {
+ fork_digest: [0; 4],
+ finalized_root: Hash256::from_low_u64_be(0),
+ finalized_epoch: Epoch::new(1),
+ head_root: Hash256::from_low_u64_be(0),
+ head_slot: Slot::new(1),
+ });
+
+ // build the sender future
+ let sender_future = async {
+ let mut request_id = 1;
+ let mut request_sent_at = Instant::now();
+ loop {
+ match sender.next_event().await {
+ NetworkEvent::PeerConnectedOutgoing(peer_id) => {
+ debug!(%request_id, "Sending RPC request");
+ sender
+ .send_request(peer_id, AppRequestId::Router, rpc_request.clone())
+ .unwrap();
+ request_sent_at = Instant::now();
+ }
+ NetworkEvent::ResponseReceived {
+ peer_id,
+ app_request_id: _,
+ response,
+ } => {
+ debug!(%request_id, "Sender received");
+ assert_eq!(response, rpc_response);
+
+ match request_id {
+ 1 => {
+ // The first response is returned instantly.
+ assert!(request_sent_at.elapsed() < Duration::from_millis(100));
+ }
+ 2..=5 => {
+ // The second and subsequent responses are delayed due to the response rate-limiter on the receiver side.
+ // Adding a slight margin to the elapsed time check to account for potential timing issues caused by system
+ // scheduling or execution delays during testing.
+ assert!(
+ request_sent_at.elapsed()
+ > (Duration::from_secs(QUOTA_SEC)
+ - Duration::from_millis(100))
+ );
+ if request_id == 5 {
+ // End the test
+ return;
+ }
+ }
+ _ => unreachable!(),
+ }
+
+ request_id += 1;
+ debug!(%request_id, "Sending RPC request");
+ sender
+ .send_request(peer_id, AppRequestId::Router, rpc_request.clone())
+ .unwrap();
+ request_sent_at = Instant::now();
+ }
+ NetworkEvent::RPCFailed {
+ app_request_id: _,
+ peer_id: _,
+ error,
+ } => {
+ error!(?error, "RPC Failed");
+ panic!("Rpc failed.");
+ }
+ _ => {}
+ }
+ }
+ };
+
+ // build the receiver future
+ let receiver_future = async {
+ loop {
+ if let NetworkEvent::RequestReceived {
+ peer_id,
+ inbound_request_id,
+ request_type,
+ } = receiver.next_event().await
+ {
+ assert_eq!(request_type, rpc_request);
+ debug!("Receiver received request");
+ receiver.send_response(peer_id, inbound_request_id, rpc_response.clone());
+ }
+ }
+ };
+
+ tokio::select! {
+ _ = sender_future => {}
+ _ = receiver_future => {}
+ _ = sleep(Duration::from_secs(30)) => {
+ panic!("Future timed out");
+ }
+ }
+ })
+}
+
+// Test that a rate-limited error doesn't occur even if the sender attempts to send many requests at
+// once, thanks to the self-limiter on the sender side.
+#[test]
+fn test_active_requests() {
+ let rt = Arc::new(Runtime::new().unwrap());
+ let spec = Arc::new(E::default_spec());
+
+ rt.block_on(async {
+ // Get sender/receiver.
+ let (mut sender, mut receiver) = common::build_node_pair(
+ Arc::downgrade(&rt),
+ ForkName::Base,
+ spec,
+ Protocol::Tcp,
+ false,
+ None,
+ )
+ .await;
+
+ // Dummy STATUS RPC request.
+ let rpc_request = RequestType::Status(StatusMessage {
+ fork_digest: [0; 4],
+ finalized_root: Hash256::from_low_u64_be(0),
+ finalized_epoch: Epoch::new(1),
+ head_root: Hash256::from_low_u64_be(0),
+ head_slot: Slot::new(1),
+ });
+
+ // Dummy STATUS RPC response.
+ let rpc_response = Response::Status(StatusMessage {
+ fork_digest: [0; 4],
+ finalized_root: Hash256::zero(),
+ finalized_epoch: Epoch::new(1),
+ head_root: Hash256::zero(),
+ head_slot: Slot::new(1),
+ });
+
+ // Number of requests.
+ const REQUESTS: u8 = 10;
+
+ // Build the sender future.
+ let sender_future = async {
+ let mut response_received = 0;
+ loop {
+ match sender.next_event().await {
+ NetworkEvent::PeerConnectedOutgoing(peer_id) => {
+ debug!("Sending RPC request");
+ // Send requests in quick succession to intentionally trigger request queueing in the self-limiter.
+ for _ in 0..REQUESTS {
+ sender
+ .send_request(peer_id, AppRequestId::Router, rpc_request.clone())
+ .unwrap();
+ }
+ }
+ NetworkEvent::ResponseReceived { response, .. } => {
+ debug!(?response, "Sender received response");
+ if matches!(response, Response::Status(_)) {
+ response_received += 1;
+ }
+ }
+ NetworkEvent::RPCFailed {
+ app_request_id: _,
+ peer_id: _,
+ error,
+ } => panic!("RPC failed: {:?}", error),
+ _ => {}
+ }
+
+ if response_received == REQUESTS {
+ return;
+ }
+ }
+ };
+
+ // Build the receiver future.
+ let receiver_future = async {
+ let mut received_requests = vec![];
+ loop {
+ tokio::select! {
+ event = receiver.next_event() => {
+ if let NetworkEvent::RequestReceived { peer_id, inbound_request_id, request_type } = event {
+ debug!(?request_type, "Receiver received request");
+ if matches!(request_type, RequestType::Status(_)) {
+ received_requests.push((peer_id, inbound_request_id));
+ }
+ }
+ }
+ // Introduce a delay in sending responses to trigger request queueing on the sender side.
+ _ = sleep(Duration::from_secs(3)) => {
+ for (peer_id, inbound_request_id) in received_requests.drain(..) {
+ receiver.send_response(peer_id, inbound_request_id, rpc_response.clone());
+ }
+ }
+ }
+ }
+ };
+
+ tokio::select! {
+ _ = sender_future => {}
+ _ = receiver_future => {}
+ _ = sleep(Duration::from_secs(30)) => {
+ panic!("Future timed out");
+ }
+ }
+ })
+}
diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs
index d61ea58377..cf0e98cda8 100644
--- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs
+++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs
@@ -1160,7 +1160,8 @@ impl NetworkBeaconProcessor {
"Processed data column, waiting for other components"
);
- self.attempt_data_column_reconstruction(block_root).await;
+ self.attempt_data_column_reconstruction(block_root, true)
+ .await;
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs
index 9a8edbfa4c..ba681eed14 100644
--- a/beacon_node/network/src/network_beacon_processor/mod.rs
+++ b/beacon_node/network/src/network_beacon_processor/mod.rs
@@ -918,9 +918,13 @@ impl NetworkBeaconProcessor {
///
/// Returns `Some(AvailabilityProcessingStatus)` if reconstruction is successfully performed,
/// otherwise returns `None`.
+ ///
+ /// The `publish_columns` parameter controls whether reconstructed columns should be published
+ /// to the gossip network.
async fn attempt_data_column_reconstruction(
self: &Arc,
block_root: Hash256,
+ publish_columns: bool,
) -> Option {
// Only supernodes attempt reconstruction
if !self.network_globals.is_supernode() {
@@ -930,7 +934,9 @@ impl NetworkBeaconProcessor {
let result = self.chain.reconstruct_data_columns(block_root).await;
match result {
Ok(Some((availability_processing_status, data_columns_to_publish))) => {
- self.publish_data_columns_gradually(data_columns_to_publish, block_root);
+ if publish_columns {
+ self.publish_data_columns_gradually(data_columns_to_publish, block_root);
+ }
match &availability_processing_status {
AvailabilityProcessingStatus::Imported(hash) => {
debug!(
@@ -1141,7 +1147,7 @@ use {
};
#[cfg(test)]
-type TestBeaconChainType =
+pub(crate) type TestBeaconChainType =
Witness, E, MemoryStore, MemoryStore>;
#[cfg(test)]
diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs
index 48ae26c826..31b17a41a4 100644
--- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs
+++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs
@@ -383,8 +383,12 @@ impl NetworkBeaconProcessor {
);
// Attempt reconstruction here before notifying sync, to avoid sending out more requests
// that we may no longer need.
- if let Some(availability) =
- self.attempt_data_column_reconstruction(block_root).await
+ // We don't publish columns reconstructed from rpc columns to the gossip network,
+ // as these are likely historic columns.
+ let publish_columns = false;
+ if let Some(availability) = self
+ .attempt_data_column_reconstruction(block_root, publish_columns)
+ .await
{
result = Ok(availability)
}
diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs
index 509caf7316..fcef06271f 100644
--- a/beacon_node/network/src/sync/backfill_sync/mod.rs
+++ b/beacon_node/network/src/sync/backfill_sync/mod.rs
@@ -10,7 +10,9 @@
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::manager::BatchProcessResult;
-use crate::sync::network_context::{RangeRequestId, RpcResponseError, SyncNetworkContext};
+use crate::sync::network_context::{
+ RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext,
+};
use crate::sync::range_sync::{
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
};
@@ -20,10 +22,9 @@ use lighthouse_network::service::api_types::Id;
use lighthouse_network::types::{BackFillState, NetworkGlobals};
use lighthouse_network::{PeerAction, PeerId};
use logging::crit;
-use rand::seq::SliceRandom;
use std::collections::{
btree_map::{BTreeMap, Entry},
- HashMap, HashSet,
+ HashSet,
};
use std::sync::Arc;
use tracing::{debug, error, info, instrument, warn};
@@ -121,9 +122,6 @@ pub struct BackFillSync {
/// Sorted map of batches undergoing some kind of processing.
batches: BTreeMap>,
- /// List of peers we are currently awaiting a response for.
- active_requests: HashMap>,
-
/// The current processing batch, if any.
current_processing_batch: Option,
@@ -176,7 +174,6 @@ impl BackFillSync {
let bfs = BackFillSync {
batches: BTreeMap::new(),
- active_requests: HashMap::new(),
processing_target: current_start,
current_start,
last_batch_downloaded: false,
@@ -314,45 +311,11 @@ impl BackFillSync {
skip_all
)]
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
- pub fn peer_disconnected(
- &mut self,
- peer_id: &PeerId,
- network: &mut SyncNetworkContext,
- ) -> Result<(), BackFillError> {
+ pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
if matches!(self.state(), BackFillState::Failed) {
return Ok(());
}
- if let Some(batch_ids) = self.active_requests.remove(peer_id) {
- // fail the batches.
- for id in batch_ids {
- if let Some(batch) = self.batches.get_mut(&id) {
- match batch.download_failed(false) {
- Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
- self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
- }
- Ok(BatchOperationOutcome::Continue) => {}
- Err(e) => {
- self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
- }
- }
- // If we have run out of peers in which to retry this batch, the backfill state
- // transitions to a paused state.
- // We still need to reset the state for all the affected batches, so we should not
- // short circuit early.
- if self.retry_batch_download(network, id).is_err() {
- debug!(
- batch_id = %id,
- error = "no synced peers",
- "Batch could not be retried"
- );
- }
- } else {
- debug!(peer = %peer_id, batch = %id, "Batch not found while removing peer");
- }
- }
- }
-
// Remove the peer from the participation list
self.participating_peers.remove(peer_id);
Ok(())
@@ -386,15 +349,12 @@ impl BackFillSync {
return Ok(());
}
debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed");
- if let Some(active_requests) = self.active_requests.get_mut(peer_id) {
- active_requests.remove(&batch_id);
- }
- match batch.download_failed(true) {
+ match batch.download_failed(Some(*peer_id)) {
Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)),
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))
}
- Ok(BatchOperationOutcome::Continue) => self.retry_batch_download(network, batch_id),
+ Ok(BatchOperationOutcome::Continue) => self.send_batch(network, batch_id),
}
} else {
// this could be an error for an old batch, removed when the chain advances
@@ -435,19 +395,11 @@ impl BackFillSync {
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
- // TODO(das): removed peer_id matching as the node may request a different peer for data
- // columns.
if !batch.is_expecting_block(&request_id) {
return Ok(ProcessResult::Successful);
}
- // A stream termination has been sent. This batch has ended. Process a completed batch.
- // Remove the request from the peer's active batches
- self.active_requests
- .get_mut(peer_id)
- .map(|active_requests| active_requests.remove(&batch_id));
-
- match batch.download_completed(blocks) {
+ match batch.download_completed(blocks, *peer_id) {
Ok(received) => {
let awaiting_batches =
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;
@@ -488,7 +440,6 @@ impl BackFillSync {
self.set_state(BackFillState::Failed);
// Remove all batches and active requests and participating peers.
self.batches.clear();
- self.active_requests.clear();
self.participating_peers.clear();
self.restart_failed_sync = false;
@@ -622,7 +573,7 @@ impl BackFillSync {
}
};
- let Some(peer) = batch.current_peer() else {
+ let Some(peer) = batch.processing_peer() else {
self.fail_sync(BackFillError::BatchInvalidState(
batch_id,
String::from("Peer does not exist"),
@@ -698,6 +649,8 @@ impl BackFillSync {
);
for peer in self.participating_peers.drain() {
+ // TODO(das): `participating_peers` only includes block peers. Should we
+ // penalize the custody column peers too?
network.report_peer(peer, *penalty, "backfill_batch_failed");
}
self.fail_sync(BackFillError::BatchProcessingFailed(batch_id))
@@ -723,7 +676,7 @@ impl BackFillSync {
{
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
}
- self.retry_batch_download(network, batch_id)?;
+ self.send_batch(network, batch_id)?;
Ok(ProcessResult::Successful)
}
}
@@ -864,12 +817,7 @@ impl BackFillSync {
}
}
}
- BatchState::Downloading(peer, ..) => {
- // remove this batch from the peer's active requests
- if let Some(active_requests) = self.active_requests.get_mut(peer) {
- active_requests.remove(&id);
- }
- }
+ BatchState::Downloading(..) => {}
BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => {
crit!("batch indicates inconsistent chain state while advancing chain")
}
@@ -951,57 +899,10 @@ impl BackFillSync {
self.processing_target = self.current_start;
for id in redownload_queue {
- self.retry_batch_download(network, id)?;
+ self.send_batch(network, id)?;
}
// finally, re-request the failed batch.
- self.retry_batch_download(network, batch_id)
- }
-
- /// Sends and registers the request of a batch awaiting download.
- #[instrument(parent = None,
- level = "info",
- fields(service = "backfill_sync"),
- name = "backfill_sync",
- skip_all
- )]
- fn retry_batch_download(
- &mut self,
- network: &mut SyncNetworkContext,
- batch_id: BatchId,
- ) -> Result<(), BackFillError> {
- let Some(batch) = self.batches.get_mut(&batch_id) else {
- return Ok(());
- };
-
- // Find a peer to request the batch
- let failed_peers = batch.failed_peers();
-
- let new_peer = self
- .network_globals
- .peers
- .read()
- .synced_peers()
- .map(|peer| {
- (
- failed_peers.contains(peer),
- self.active_requests.get(peer).map(|v| v.len()).unwrap_or(0),
- rand::random::(),
- *peer,
- )
- })
- // Sort peers prioritizing unrelated peers with less active requests.
- .min()
- .map(|(_, _, _, peer)| peer);
-
- if let Some(peer) = new_peer {
- self.participating_peers.insert(peer);
- self.send_batch(network, batch_id, peer)
- } else {
- // If we are here the chain has no more synced peers
- info!(reason = "insufficient_synced_peers", "Backfill sync paused");
- self.set_state(BackFillState::Paused);
- Err(BackFillError::Paused)
- }
+ self.send_batch(network, batch_id)
}
/// Requests the batch assigned to the given id from a given peer.
@@ -1015,53 +916,65 @@ impl BackFillSync {
&mut self,
network: &mut SyncNetworkContext,
batch_id: BatchId,
- peer: PeerId,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
+ let synced_peers = self
+ .network_globals
+ .peers
+ .read()
+ .synced_peers()
+ .cloned()
+ .collect::>();
+
let (request, is_blob_batch) = batch.to_blocks_by_range_request();
+ let failed_peers = batch.failed_peers();
match network.block_components_by_range_request(
- peer,
is_blob_batch,
request,
RangeRequestId::BackfillSync { batch_id },
+ &synced_peers,
+ &failed_peers,
) {
Ok(request_id) => {
// inform the batch about the new request
- if let Err(e) = batch.start_downloading_from_peer(peer, request_id) {
+ if let Err(e) = batch.start_downloading(request_id) {
return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0));
}
debug!(epoch = %batch_id, %batch, "Requesting batch");
- // register the batch for this peer
- self.active_requests
- .entry(peer)
- .or_default()
- .insert(batch_id);
return Ok(());
}
- Err(e) => {
- // NOTE: under normal conditions this shouldn't happen but we handle it anyway
- warn!(%batch_id, error = ?e, %batch,"Could not send batch request");
- // register the failed download and check if the batch can be retried
- if let Err(e) = batch.start_downloading_from_peer(peer, 1) {
- return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0));
+ Err(e) => match e {
+ RpcRequestSendError::NoPeer(no_peer) => {
+ // If we are here the chain has no more synced peers
+ info!(
+ "reason" = format!("insufficient_synced_peers({no_peer:?})"),
+ "Backfill sync paused"
+ );
+ self.set_state(BackFillState::Paused);
+ return Err(BackFillError::Paused);
}
- self.active_requests
- .get_mut(&peer)
- .map(|request| request.remove(&batch_id));
+ RpcRequestSendError::InternalError(e) => {
+ // NOTE: under normal conditions this shouldn't happen but we handle it anyway
+ warn!(%batch_id, error = ?e, %batch,"Could not send batch request");
+ // register the failed download and check if the batch can be retried
+ if let Err(e) = batch.start_downloading(1) {
+ return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0));
+ }
- match batch.download_failed(true) {
- Err(e) => {
- self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?
- }
- Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
- self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?
- }
- Ok(BatchOperationOutcome::Continue) => {
- return self.retry_batch_download(network, batch_id)
+ match batch.download_failed(None) {
+ Err(e) => {
+ self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?
+ }
+ Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
+ self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?
+ }
+ Ok(BatchOperationOutcome::Continue) => {
+ return self.send_batch(network, batch_id)
+ }
}
}
- }
+ },
}
}
@@ -1093,7 +1006,7 @@ impl BackFillSync {
.collect::>();
for batch_id in batch_ids_to_retry {
- self.retry_batch_download(network, batch_id)?;
+ self.send_batch(network, batch_id)?;
}
Ok(())
}
@@ -1115,34 +1028,16 @@ impl BackFillSync {
}
// find the next pending batch and request it from the peer
-
- // randomize the peers for load balancing
- let mut rng = rand::thread_rng();
- let mut idle_peers = self
- .network_globals
- .peers
- .read()
- .synced_peers()
- .filter(|peer_id| {
- self.active_requests
- .get(peer_id)
- .map(|requests| requests.is_empty())
- .unwrap_or(true)
- })
- .cloned()
- .collect::>();
-
- idle_peers.shuffle(&mut rng);
-
- while let Some(peer) = idle_peers.pop() {
- if let Some(batch_id) = self.include_next_batch(network) {
- // send the batch
- self.send_batch(network, batch_id, peer)?;
- } else {
- // No more batches, simply stop
- return Ok(());
- }
+ // Note: for this function to not infinite loop we must:
+ // - If `include_next_batch` returns Some we MUST increase the count of batches that are
+ // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of
+ // that function.
+ while let Some(batch_id) = self.include_next_batch(network) {
+ // send the batch
+ self.send_batch(network, batch_id)?;
}
+
+ // No more batches, simply stop
Ok(())
}
@@ -1296,3 +1191,73 @@ enum ResetEpochError {
/// The chain has already completed.
SyncCompleted,
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use beacon_chain::test_utils::BeaconChainHarness;
+ use bls::Hash256;
+ use lighthouse_network::{NetworkConfig, SyncInfo, SyncStatus};
+ use rand::prelude::StdRng;
+ use rand::SeedableRng;
+ use types::MinimalEthSpec;
+
+ #[test]
+ fn request_batches_should_not_loop_infinitely() {
+ let harness = BeaconChainHarness::builder(MinimalEthSpec)
+ .default_spec()
+ .deterministic_keypairs(4)
+ .fresh_ephemeral_store()
+ .build();
+
+ let beacon_chain = harness.chain.clone();
+ let slots_per_epoch = MinimalEthSpec::slots_per_epoch();
+
+ let network_globals = Arc::new(NetworkGlobals::new_test_globals(
+ vec![],
+ Arc::new(NetworkConfig::default()),
+ beacon_chain.spec.clone(),
+ ));
+
+ {
+ let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
+ let peer_id = network_globals
+ .peers
+ .write()
+ .__add_connected_peer_testing_only(
+ true,
+ &beacon_chain.spec,
+ k256::ecdsa::SigningKey::random(&mut rng).into(),
+ );
+
+ // Simulate finalized epoch and head being 2 epochs ahead
+ let finalized_epoch = Epoch::new(40);
+ let head_epoch = finalized_epoch + 2;
+ let head_slot = head_epoch.start_slot(slots_per_epoch) + 1;
+
+ network_globals.peers.write().update_sync_status(
+ &peer_id,
+ SyncStatus::Synced {
+ info: SyncInfo {
+ head_slot,
+ head_root: Hash256::random(),
+ finalized_epoch,
+ finalized_root: Hash256::random(),
+ },
+ },
+ );
+ }
+
+ let mut network = SyncNetworkContext::new_for_testing(
+ beacon_chain.clone(),
+ network_globals.clone(),
+ harness.runtime.task_executor.clone(),
+ );
+
+ let mut backfill = BackFillSync::new(beacon_chain, network_globals);
+ backfill.set_state(BackFillState::Syncing);
+
+ // if this ends up running into an infinite loop, the test will overflow the stack pretty quickly.
+ let _ = backfill.request_batches(&mut network);
+ }
+}
diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs
index 84e492c04f..9119b1652c 100644
--- a/beacon_node/network/src/sync/manager.rs
+++ b/beacon_node/network/src/sync/manager.rs
@@ -515,9 +515,7 @@ impl SyncManager {
// Remove peer from all data structures
self.range_sync.peer_disconnect(&mut self.network, peer_id);
- let _ = self
- .backfill_sync
- .peer_disconnected(peer_id, &mut self.network);
+ let _ = self.backfill_sync.peer_disconnected(peer_id);
self.block_lookups.peer_disconnected(peer_id);
// Regardless of the outcome, we update the sync status.
diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs
index 2cb5ec9a0a..d9eda651e7 100644
--- a/beacon_node/network/src/sync/network_context.rs
+++ b/beacon_node/network/src/sync/network_context.rs
@@ -9,6 +9,8 @@ use super::range_sync::ByRangeRequestType;
use super::SyncMessage;
use crate::metrics;
use crate::network_beacon_processor::NetworkBeaconProcessor;
+#[cfg(test)]
+use crate::network_beacon_processor::TestBeaconChainType;
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId;
@@ -27,18 +29,20 @@ use lighthouse_network::service::api_types::{
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use parking_lot::RwLock;
-use rand::prelude::IteratorRandom;
-use rand::thread_rng;
pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
};
+#[cfg(test)]
+use slot_clock::SlotClock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
+#[cfg(test)]
+use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{debug, error, span, warn, Level};
use types::blob_sidecar::FixedBlobSidecarList;
@@ -82,24 +86,18 @@ pub enum RpcResponseError {
#[derive(Debug, PartialEq, Eq)]
pub enum RpcRequestSendError {
- /// Network channel send failed
- NetworkSendError,
- NoCustodyPeers,
- CustodyRequestError(custody::Error),
- SlotClockError,
+ /// No peer available matching the required criteria
+ NoPeer(NoPeerError),
+ /// These errors should never happen, including unreachable custody errors or network send
+ /// errors.
+ InternalError(String),
}
-impl std::fmt::Display for RpcRequestSendError {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- match self {
- RpcRequestSendError::NetworkSendError => write!(f, "Network send error"),
- RpcRequestSendError::NoCustodyPeers => write!(f, "No custody peers"),
- RpcRequestSendError::CustodyRequestError(e) => {
- write!(f, "Custody request error: {:?}", e)
- }
- RpcRequestSendError::SlotClockError => write!(f, "Slot clock error"),
- }
- }
+/// Type of peer missing that caused a `RpcRequestSendError::NoPeers`
+#[derive(Debug, PartialEq, Eq)]
+pub enum NoPeerError {
+ BlockPeer,
+ CustodyPeer(ColumnIndex),
}
#[derive(Debug, PartialEq, Eq)]
@@ -232,6 +230,35 @@ pub enum RangeBlockComponent {
),
}
+#[cfg(test)]
+impl SyncNetworkContext> {
+ pub fn new_for_testing(
+ beacon_chain: Arc>>,
+ network_globals: Arc>,
+ task_executor: TaskExecutor,
+ ) -> Self {
+ let fork_context = Arc::new(ForkContext::new::(
+ beacon_chain.slot_clock.now().unwrap_or(Slot::new(0)),
+ beacon_chain.genesis_validators_root,
+ &beacon_chain.spec,
+ ));
+ let (network_tx, _network_rx) = mpsc::unbounded_channel();
+ let (beacon_processor, _) = NetworkBeaconProcessor::null_for_testing(
+ network_globals,
+ mpsc::unbounded_channel().0,
+ beacon_chain.clone(),
+ task_executor,
+ );
+
+ SyncNetworkContext::new(
+ network_tx,
+ Arc::new(beacon_processor),
+ beacon_chain,
+ fork_context,
+ )
+ }
+}
+
impl SyncNetworkContext {
pub fn new(
network_send: mpsc::UnboundedSender>,
@@ -331,12 +358,6 @@ impl SyncNetworkContext {
.custody_peers_for_column(column_index)
}
- pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option {
- self.get_custodial_peers(column_index)
- .into_iter()
- .choose(&mut thread_rng())
- }
-
pub fn network_globals(&self) -> &NetworkGlobals {
&self.network_beacon_processor.network_globals
}
@@ -381,34 +402,102 @@ impl SyncNetworkContext {
}
}
+ fn active_request_count_by_peer(&self) -> HashMap {
+ let Self {
+ network_send: _,
+ request_id: _,
+ blocks_by_root_requests,
+ blobs_by_root_requests,
+ data_columns_by_root_requests,
+ blocks_by_range_requests,
+ blobs_by_range_requests,
+ data_columns_by_range_requests,
+ // custody_by_root_requests is a meta request of data_columns_by_root_requests
+ custody_by_root_requests: _,
+ // components_by_range_requests is a meta request of various _by_range requests
+ components_by_range_requests: _,
+ execution_engine_state: _,
+ network_beacon_processor: _,
+ chain: _,
+ fork_context: _,
+ // Don't use a fallback match. We want to be sure that all requests are considered when
+ // adding new ones
+ } = self;
+
+ let mut active_request_count_by_peer = HashMap::::new();
+
+ for peer_id in blocks_by_root_requests
+ .iter_request_peers()
+ .chain(blobs_by_root_requests.iter_request_peers())
+ .chain(data_columns_by_root_requests.iter_request_peers())
+ .chain(blocks_by_range_requests.iter_request_peers())
+ .chain(blobs_by_range_requests.iter_request_peers())
+ .chain(data_columns_by_range_requests.iter_request_peers())
+ {
+ *active_request_count_by_peer.entry(peer_id).or_default() += 1;
+ }
+
+ active_request_count_by_peer
+ }
+
/// A blocks by range request sent by the range sync algorithm
pub fn block_components_by_range_request(
&mut self,
- peer_id: PeerId,
batch_type: ByRangeRequestType,
request: BlocksByRangeRequest,
requester: RangeRequestId,
+ peers: &HashSet,
+ peers_to_deprioritize: &HashSet,
) -> Result {
+ let active_request_count_by_peer = self.active_request_count_by_peer();
+
+ let Some(block_peer) = peers
+ .iter()
+ .map(|peer| {
+ (
+ // If contains -> 1 (order after), not contains -> 0 (order first)
+ peers_to_deprioritize.contains(peer),
+ // Prefer peers with less overall requests
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
+ rand::random::(),
+ peer,
+ )
+ })
+ .min()
+ .map(|(_, _, _, peer)| *peer)
+ else {
+ // Backfill and forward sync handle this condition gracefully.
+ // - Backfill sync: will pause waiting for more peers to join
+ // - Forward sync: can never happen as the chain is dropped when removing the last peer.
+ return Err(RpcRequestSendError::NoPeer(NoPeerError::BlockPeer));
+ };
+
+ // Attempt to find all required custody peers before sending any request or creating an ID
+ let columns_by_range_peers_to_request =
+ if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
+ let column_indexes = self.network_globals().sampling_columns.clone();
+ Some(self.select_columns_by_range_peers_to_request(
+ &column_indexes,
+ peers,
+ active_request_count_by_peer,
+ peers_to_deprioritize,
+ )?)
+ } else {
+ None
+ };
+
// Create the overall components_by_range request ID before its individual components
let id = ComponentsByRangeRequestId {
id: self.next_id(),
requester,
};
- // Compute custody column peers before sending the blocks_by_range request. If we don't have
- // enough peers, error here.
- let data_column_requests = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
- let column_indexes = self.network_globals().sampling_columns.clone();
- Some(self.make_columns_by_range_requests(request.clone(), &column_indexes)?)
- } else {
- None
- };
-
- let blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?;
+ let blocks_req_id = self.send_blocks_by_range_request(block_peer, request.clone(), id)?;
let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) {
Some(self.send_blobs_by_range_request(
- peer_id,
+ block_peer,
BlobsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
@@ -419,64 +508,98 @@ impl SyncNetworkContext {
None
};
- let data_columns = if let Some(data_column_requests) = data_column_requests {
- let data_column_requests = data_column_requests
- .into_iter()
- .map(|(peer_id, columns_by_range_request)| {
- self.send_data_columns_by_range_request(peer_id, columns_by_range_request, id)
- })
- .collect::, _>>()?;
+ let data_column_requests = columns_by_range_peers_to_request
+ .map(|columns_by_range_peers_to_request| {
+ columns_by_range_peers_to_request
+ .into_iter()
+ .map(|(peer_id, columns)| {
+ self.send_data_columns_by_range_request(
+ peer_id,
+ DataColumnsByRangeRequest {
+ start_slot: *request.start_slot(),
+ count: *request.count(),
+ columns,
+ },
+ id,
+ )
+ })
+ .collect::, _>>()
+ })
+ .transpose()?;
- Some((
- data_column_requests,
- self.network_globals()
- .sampling_columns
- .iter()
- .cloned()
- .collect::>(),
- ))
- } else {
- None
- };
-
- let info = RangeBlockComponentsRequest::new(blocks_req_id, blobs_req_id, data_columns);
+ let info = RangeBlockComponentsRequest::new(
+ blocks_req_id,
+ blobs_req_id,
+ data_column_requests.map(|data_column_requests| {
+ (
+ data_column_requests,
+ self.network_globals()
+ .sampling_columns
+ .clone()
+ .iter()
+ .copied()
+ .collect(),
+ )
+ }),
+ );
self.components_by_range_requests.insert(id, info);
Ok(id.id)
}
- fn make_columns_by_range_requests(
+ fn select_columns_by_range_peers_to_request(
&self,
- request: BlocksByRangeRequest,
custody_indexes: &HashSet,
- ) -> Result, RpcRequestSendError> {
- let mut peer_id_to_request_map = HashMap::new();
+ peers: &HashSet,
+ active_request_count_by_peer: HashMap,
+ peers_to_deprioritize: &HashSet,
+ ) -> Result>, RpcRequestSendError> {
+ let mut columns_to_request_by_peer = HashMap::>::new();
for column_index in custody_indexes {
- // TODO(das): The peer selection logic here needs to be improved - we should probably
- // avoid retrying from failed peers, however `BatchState` currently only tracks the peer
- // serving the blocks.
- let Some(custody_peer) = self.get_random_custodial_peer(*column_index) else {
+ // Strictly consider peers that are custodials of this column AND are part of this
+ // syncing chain. If the forward range sync chain has few peers, it's likely that this
+ // function will not be able to find peers on our custody columns.
+ let Some(custody_peer) = peers
+ .iter()
+ .filter(|peer| {
+ self.network_globals()
+ .is_custody_peer_of(*column_index, peer)
+ })
+ .map(|peer| {
+ (
+ // If contains -> 1 (order after), not contains -> 0 (order first)
+ peers_to_deprioritize.contains(peer),
+ // Prefer peers with less overall requests
+ // Also account for requests that are not yet issued tracked in peer_id_to_request_map
+ // We batch requests to the same peer, so count existance in the
+ // `columns_to_request_by_peer` as a single 1 request.
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0)
+ + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
+ rand::random::(),
+ peer,
+ )
+ })
+ .min()
+ .map(|(_, _, _, peer)| *peer)
+ else {
// TODO(das): this will be pretty bad UX. To improve we should:
- // - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
// minutes / seconds to the peer manager to locate peers on this subnet before
// abandoing progress on the chain completely.
- return Err(RpcRequestSendError::NoCustodyPeers);
+ return Err(RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer(
+ *column_index,
+ )));
};
- let columns_by_range_request = peer_id_to_request_map
+ columns_to_request_by_peer
.entry(custody_peer)
- .or_insert_with(|| DataColumnsByRangeRequest {
- start_slot: *request.start_slot(),
- count: *request.count(),
- columns: vec![],
- });
-
- columns_by_range_request.columns.push(*column_index);
+ .or_default()
+ .push(*column_index);
}
- Ok(peer_id_to_request_map)
+ Ok(columns_to_request_by_peer)
}
/// Received a blocks by range or blobs by range response for a request that couples blocks '
@@ -536,11 +659,21 @@ impl SyncNetworkContext {
lookup_peers: Arc>>,
block_root: Hash256,
) -> Result {
+ let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(peer_id) = lookup_peers
.read()
.iter()
- .choose(&mut rand::thread_rng())
- .copied()
+ .map(|peer| {
+ (
+ // Prefer peers with less overall requests
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
+ rand::random::(),
+ peer,
+ )
+ })
+ .min()
+ .map(|(_, _, peer)| *peer)
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
@@ -597,7 +730,7 @@ impl SyncNetworkContext {
request: RequestType::BlocksByRoot(request.into_request(&self.fork_context)),
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "BlocksByRoot",
@@ -632,11 +765,21 @@ impl SyncNetworkContext {
block_root: Hash256,
expected_blobs: usize,
) -> Result {
+ let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(peer_id) = lookup_peers
.read()
.iter()
- .choose(&mut rand::thread_rng())
- .copied()
+ .map(|peer| {
+ (
+ // Prefer peers with less overall requests
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
+ rand::random::(),
+ peer,
+ )
+ })
+ .min()
+ .map(|(_, _, peer)| *peer)
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
@@ -686,7 +829,7 @@ impl SyncNetworkContext {
request: RequestType::BlobsByRoot(request.clone().into_request(&self.fork_context)),
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "BlobsByRoot",
@@ -821,7 +964,25 @@ impl SyncNetworkContext {
self.custody_by_root_requests.insert(requester, request);
Ok(LookupRequestResult::RequestSent(id.req_id))
}
- Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)),
+ Err(e) => Err(match e {
+ CustodyRequestError::NoPeer(column_index) => {
+ RpcRequestSendError::NoPeer(NoPeerError::CustodyPeer(column_index))
+ }
+ // - TooManyFailures: Should never happen, `request` has just been created, it's
+ // count of download_failures is 0 here
+ // - BadState: Should never happen, a bad state can only happen when handling a
+ // network response
+ // - UnexpectedRequestId: Never happens: this Err is only constructed handling a
+ // download or processing response
+ // - SendFailed: Should never happen unless in a bad drop sequence when shutting
+ // down the node
+ e @ (CustodyRequestError::TooManyFailures
+ | CustodyRequestError::BadState { .. }
+ | CustodyRequestError::UnexpectedRequestId { .. }
+ | CustodyRequestError::SendFailed { .. }) => {
+ RpcRequestSendError::InternalError(format!("{e:?}"))
+ }
+ }),
}
}
@@ -841,7 +1002,7 @@ impl SyncNetworkContext {
request: RequestType::BlocksByRange(request.clone().into()),
app_request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "BlocksByRange",
@@ -882,7 +1043,7 @@ impl SyncNetworkContext {
request: RequestType::BlobsByRange(request.clone()),
app_request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "BlobsByRange",
@@ -921,7 +1082,7 @@ impl SyncNetworkContext {
request: RequestType::DataColumnsByRange(request.clone()),
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)),
})
- .map_err(|_| RpcRequestSendError::NetworkSendError)?;
+ .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "DataColumnsByRange",
diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs
index e7e6e62349..f4d010b881 100644
--- a/beacon_node/network/src/sync/network_context/custody.rs
+++ b/beacon_node/network/src/sync/network_context/custody.rs
@@ -45,7 +45,7 @@ pub enum Error {
SendFailed(&'static str),
TooManyFailures,
BadState(String),
- NoPeers(ColumnIndex),
+ NoPeer(ColumnIndex),
/// Received a download result for a different request id than the in-flight request.
/// There should only exist a single request at a time. Having multiple requests is a bug and
/// can result in undefined state, so it's treated as a hard error and the lookup is dropped.
@@ -56,7 +56,6 @@ pub enum Error {
}
struct ActiveBatchColumnsRequest {
- peer_id: PeerId,
indices: Vec,
}
@@ -220,6 +219,7 @@ impl ActiveCustodyRequest {
return Ok(Some((columns, peer_group, max_seen_timestamp)));
}
+ let active_request_count_by_peer = cx.active_request_count_by_peer();
let mut columns_to_request_by_peer = HashMap::>::new();
let lookup_peers = self.lookup_peers.read();
@@ -238,15 +238,11 @@ impl ActiveCustodyRequest {
// only query the peers on that fork. Should this case be handled? How to handle it?
let custodial_peers = cx.get_custodial_peers(*column_index);
- // TODO(das): cache this computation in a OneCell or similar to prevent having to
- // run it every loop
- let mut active_requests_by_peer = HashMap::::new();
- for batch_request in self.active_batch_columns_requests.values() {
- *active_requests_by_peer
- .entry(batch_request.peer_id)
- .or_default() += 1;
- }
-
+ // We draw from the total set of peers, but prioritize those peers who we have
+ // received an attestation / status / block message claiming to have imported the
+ // lookup. The frequency of those messages is low, so drawing only from lookup_peers
+ // could cause many lookups to take much longer or fail as they don't have enough
+ // custody peers on a given column
let mut priorized_peers = custodial_peers
.iter()
.map(|peer| {
@@ -256,9 +252,12 @@ impl ActiveCustodyRequest {
// De-prioritize peers that have failed to successfully respond to
// requests recently
self.failed_peers.contains(peer),
- // Prefer peers with less requests to load balance across peers
- active_requests_by_peer.get(peer).copied().unwrap_or(0),
- // Final random factor to give all peers a shot in each retry
+ // Prefer peers with fewer requests to load balance across peers.
+ // We batch requests to the same peer, so count existence in the
+ // `columns_to_request_by_peer` as a single 1 request.
+ active_request_count_by_peer.get(peer).copied().unwrap_or(0)
+ + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0),
+ // Random factor to break ties, otherwise the PeerID breaks ties
rand::thread_rng().gen::(),
*peer,
)
@@ -276,7 +275,7 @@ impl ActiveCustodyRequest {
// `MAX_STALE_NO_PEERS_DURATION`, else error and drop the request. Note that
// lookup will naturally retry when other peers send us attestations for
// descendants of this un-available lookup.
- return Err(Error::NoPeers(*column_index));
+ return Err(Error::NoPeer(*column_index));
} else {
// Do not issue requests if there is no custody peer on this column
}
@@ -306,13 +305,14 @@ impl ActiveCustodyRequest {
let column_request = self
.column_requests
.get_mut(column_index)
+ // Should never happen: column_index is iterated from column_requests
.ok_or(Error::BadState("unknown column_index".to_owned()))?;
column_request.on_download_start(req_id)?;
}
self.active_batch_columns_requests
- .insert(req_id, ActiveBatchColumnsRequest { indices, peer_id });
+ .insert(req_id, ActiveBatchColumnsRequest { indices });
}
LookupRequestResult::NoRequestNeeded(_) => unreachable!(),
LookupRequestResult::Pending(_) => unreachable!(),
diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs
index c9b85e47b6..963b633ed6 100644
--- a/beacon_node/network/src/sync/network_context/requests.rs
+++ b/beacon_node/network/src/sync/network_context/requests.rs
@@ -179,6 +179,10 @@ impl