Separate committee subscriptions queue (#3508)

## Issue Addressed

NA

## Proposed Changes

As we've seen on Prater, there seems to be a correlation between these messages

```
WARN Not enough time for a discovery search  subnet_id: ExactSubnet { subnet_id: SubnetId(19), slot: Slot(3742336) }, service: attestation_service
```

... and nodes falling 20-30 slots behind the head for short periods. These nodes are running ~20k Prater validators.

After running some metrics, I can see that the `network_recv` channel is processing ~250k `AttestationSubscribe` messages per minute. It occurred to me that perhaps the `AttestationSubscribe` messages are "washing out" the `SendRequest` and `SendResponse` messages. In this PR I separate the `AttestationSubscribe` and `SyncCommitteeSubscribe` messages into their own queue so the `tokio::select!` in the `NetworkService` can still process the other messages in the `network_recv` channel without necessarily having to clear all the subscription messages first.

~~I've also added filter to the HTTP API to prevent duplicate subscriptions going to the network service.~~

## Additional Info

- Currently being tested on Prater
This commit is contained in:
Paul Hauner
2022-08-30 05:47:31 +00:00
parent ebd0e0e2d9
commit 661307dce1
8 changed files with 231 additions and 102 deletions

View File

@@ -22,7 +22,7 @@ use execution_layer::ExecutionLayer;
use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH};
use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals};
use monitoring_api::{MonitoringHttpClient, ProcessType};
use network::{NetworkConfig, NetworkMessage, NetworkService};
use network::{NetworkConfig, NetworkSenders, NetworkService};
use slasher::Slasher;
use slasher_service::SlasherService;
use slog::{debug, info, warn, Logger};
@@ -31,7 +31,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use timer::spawn_timer;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio::sync::oneshot;
use types::{
test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec,
ExecutionBlockHash, Hash256, SignedBeaconBlock,
@@ -66,7 +66,7 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
beacon_chain: Option<Arc<BeaconChain<T>>>,
eth1_service: Option<Eth1Service>,
network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
network_send: Option<UnboundedSender<NetworkMessage<T::EthSpec>>>,
network_senders: Option<NetworkSenders<T::EthSpec>>,
gossipsub_registry: Option<Registry>,
db_path: Option<PathBuf>,
freezer_db_path: Option<PathBuf>,
@@ -98,7 +98,7 @@ where
beacon_chain: None,
eth1_service: None,
network_globals: None,
network_send: None,
network_senders: None,
gossipsub_registry: None,
db_path: None,
freezer_db_path: None,
@@ -397,7 +397,7 @@ where
> = Arc::new(http_api::Context {
config: self.http_api_config.clone(),
chain: None,
network_tx: None,
network_senders: None,
network_globals: None,
eth1_service: Some(genesis_service.eth1_service.clone()),
log: context.log().clone(),
@@ -481,7 +481,7 @@ where
None
};
let (network_globals, network_send) = NetworkService::start(
let (network_globals, network_senders) = NetworkService::start(
beacon_chain,
config,
context.executor,
@@ -493,7 +493,7 @@ where
.map_err(|e| format!("Failed to start network: {:?}", e))?;
self.network_globals = Some(network_globals);
self.network_send = Some(network_send);
self.network_senders = Some(network_senders);
self.gossipsub_registry = gossipsub_registry;
Ok(self)
@@ -537,16 +537,16 @@ where
.beacon_chain
.clone()
.ok_or("slasher service requires a beacon chain")?;
let network_send = self
.network_send
let network_senders = self
.network_senders
.clone()
.ok_or("slasher service requires a network sender")?;
.ok_or("slasher service requires network senders")?;
let context = self
.runtime_context
.as_ref()
.ok_or("slasher requires a runtime_context")?
.service_context("slasher_service_ctxt".into());
SlasherService::new(beacon_chain, network_send).run(&context.executor)
SlasherService::new(beacon_chain, network_senders.network_send()).run(&context.executor)
}
/// Start the explorer client which periodically sends beacon
@@ -616,7 +616,7 @@ where
let ctx = Arc::new(http_api::Context {
config: self.http_api_config.clone(),
chain: self.beacon_chain.clone(),
network_tx: self.network_send.clone(),
network_senders: self.network_senders.clone(),
network_globals: self.network_globals.clone(),
eth1_service: self.eth1_service.clone(),
log: log.clone(),