mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-18 12:22:51 +00:00
Partially https://github.com/sigp/lighthouse/issues/6291 This PR removes the reprocess event channel from being externally exposed. All work events are now sent through the single `BeaconProcessorSend` channel. I've introduced a new `Work::Reprocess` enum variant which we then use to schedule jobs for reprocess. I've also created a new scheduler module which will eventually house the different scheduler impls. This is all needed as an initial step to generalize the beacon processor A "full" implementation for the generalized beacon processor can be found here https://github.com/sigp/lighthouse/pull/6448 I'm going to try to break up the full implementation into smaller PR's so it can actually be reviewed
930 lines
37 KiB
Rust
930 lines
37 KiB
Rust
use crate::metrics;
|
|
use crate::nat;
|
|
use crate::network_beacon_processor::InvalidBlockStorage;
|
|
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
|
|
use crate::router::{Router, RouterMessage};
|
|
use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription};
|
|
use crate::NetworkConfig;
|
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
|
use beacon_processor::BeaconProcessorSend;
|
|
use futures::channel::mpsc::Sender;
|
|
use futures::future::OptionFuture;
|
|
use futures::prelude::*;
|
|
|
|
use lighthouse_network::rpc::InboundRequestId;
|
|
use lighthouse_network::rpc::RequestType;
|
|
use lighthouse_network::service::Network;
|
|
use lighthouse_network::types::GossipKind;
|
|
use lighthouse_network::Enr;
|
|
use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance};
|
|
use lighthouse_network::{
|
|
rpc::{GoodbyeReason, RpcErrorResponse},
|
|
Context, PeerAction, PubsubMessage, ReportSource, Response, Subnet,
|
|
};
|
|
use lighthouse_network::{
|
|
service::api_types::AppRequestId,
|
|
types::{core_topics_to_subscribe, GossipEncoding, GossipTopic},
|
|
MessageId, NetworkEvent, NetworkGlobals, PeerId,
|
|
};
|
|
use logging::crit;
|
|
use std::collections::BTreeSet;
|
|
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
|
|
use store::HotColdDB;
|
|
use strum::IntoStaticStr;
|
|
use task_executor::ShutdownReason;
|
|
use tokio::sync::mpsc;
|
|
use tokio::time::Sleep;
|
|
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
|
|
use types::{
|
|
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
|
|
Unsigned, ValidatorSubscription,
|
|
};
|
|
|
|
mod tests;
|
|
|
|
/// The interval (in seconds) that various network metrics will update.
|
|
const METRIC_UPDATE_INTERVAL: u64 = 5;
|
|
/// Number of slots before the fork when we should subscribe to the new fork topics.
|
|
const SUBSCRIBE_DELAY_SLOTS: u64 = 2;
|
|
/// Delay after a fork where we unsubscribe from pre-fork topics.
|
|
const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2;
|
|
/// Size of the queue for validator subnet subscriptions. The number is chosen so that we may be
|
|
/// able to run tens of thousands of validators on one BN.
|
|
const VALIDATOR_SUBSCRIPTION_MESSAGE_QUEUE_SIZE: usize = 65_536;
|
|
|
|
/// Types of messages that the network service can receive.
|
|
#[derive(Debug, IntoStaticStr)]
|
|
#[strum(serialize_all = "snake_case")]
|
|
pub enum NetworkMessage<E: EthSpec> {
|
|
/// Subscribes the beacon node to the core gossipsub topics. We do this when we are either
|
|
/// synced or close to the head slot.
|
|
SubscribeCoreTopics,
|
|
/// Send an RPC request to the libp2p service.
|
|
SendRequest {
|
|
peer_id: PeerId,
|
|
request: RequestType<E>,
|
|
app_request_id: AppRequestId,
|
|
},
|
|
/// Send a successful Response to the libp2p service.
|
|
SendResponse {
|
|
peer_id: PeerId,
|
|
inbound_request_id: InboundRequestId,
|
|
response: Response<E>,
|
|
},
|
|
/// Sends an error response to an RPC request.
|
|
SendErrorResponse {
|
|
peer_id: PeerId,
|
|
inbound_request_id: InboundRequestId,
|
|
error: RpcErrorResponse,
|
|
reason: String,
|
|
},
|
|
/// Publish a list of messages to the gossipsub protocol.
|
|
Publish { messages: Vec<PubsubMessage<E>> },
|
|
/// Validates a received gossipsub message. This will propagate the message on the network.
|
|
ValidationResult {
|
|
/// The peer that sent us the message. We don't send back to this peer.
|
|
propagation_source: PeerId,
|
|
/// The id of the message we are validating and propagating.
|
|
message_id: MessageId,
|
|
/// The result of the validation
|
|
validation_result: MessageAcceptance,
|
|
},
|
|
/// Reports a peer to the peer manager for performing an action.
|
|
ReportPeer {
|
|
peer_id: PeerId,
|
|
action: PeerAction,
|
|
source: ReportSource,
|
|
msg: &'static str,
|
|
},
|
|
/// Disconnect an ban a peer, providing a reason.
|
|
GoodbyePeer {
|
|
peer_id: PeerId,
|
|
reason: GoodbyeReason,
|
|
source: ReportSource,
|
|
},
|
|
/// Connect to a trusted peer and try to maintain the connection.
|
|
ConnectTrustedPeer(Enr),
|
|
/// Disconnect from a trusted peer and remove it from the `trusted_peers` mapping.
|
|
DisconnectTrustedPeer(Enr),
|
|
/// Custody group count changed due to a change in validators' weight.
|
|
/// Subscribe to new subnets and update ENR metadata.
|
|
CustodyCountChanged {
|
|
new_custody_group_count: u64,
|
|
sampling_count: u64,
|
|
},
|
|
}
|
|
|
|
/// Messages triggered by validators that may trigger a subscription to a subnet.
|
|
///
|
|
/// These messages can be very numerous with large validator counts (hundreds of thousands per
|
|
/// minute). Therefore we separate them from the separated from the `NetworkMessage` to provide
|
|
/// fairness regarding message processing.
|
|
#[derive(Debug, IntoStaticStr)]
|
|
#[strum(serialize_all = "snake_case")]
|
|
pub enum ValidatorSubscriptionMessage {
|
|
/// Subscribes a list of validators to specific slots for attestation duties.
|
|
AttestationSubscribe {
|
|
subscriptions: BTreeSet<ValidatorSubscription>,
|
|
},
|
|
SyncCommitteeSubscribe {
|
|
subscriptions: Vec<SyncCommitteeSubscription>,
|
|
},
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct NetworkSenders<E: EthSpec> {
|
|
network_send: mpsc::UnboundedSender<NetworkMessage<E>>,
|
|
validator_subscription_send: mpsc::Sender<ValidatorSubscriptionMessage>,
|
|
}
|
|
|
|
pub struct NetworkReceivers<E: EthSpec> {
|
|
pub network_recv: mpsc::UnboundedReceiver<NetworkMessage<E>>,
|
|
pub validator_subscription_recv: mpsc::Receiver<ValidatorSubscriptionMessage>,
|
|
}
|
|
|
|
impl<E: EthSpec> NetworkSenders<E> {
|
|
pub fn new() -> (Self, NetworkReceivers<E>) {
|
|
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage<E>>();
|
|
let (validator_subscription_send, validator_subscription_recv) =
|
|
mpsc::channel(VALIDATOR_SUBSCRIPTION_MESSAGE_QUEUE_SIZE);
|
|
let senders = Self {
|
|
network_send,
|
|
validator_subscription_send,
|
|
};
|
|
let receivers = NetworkReceivers {
|
|
network_recv,
|
|
validator_subscription_recv,
|
|
};
|
|
(senders, receivers)
|
|
}
|
|
|
|
pub fn network_send(&self) -> mpsc::UnboundedSender<NetworkMessage<E>> {
|
|
self.network_send.clone()
|
|
}
|
|
|
|
pub fn validator_subscription_send(&self) -> mpsc::Sender<ValidatorSubscriptionMessage> {
|
|
self.validator_subscription_send.clone()
|
|
}
|
|
}
|
|
|
|
/// Service that handles communication between internal services and the `lighthouse_network` network service.
|
|
pub struct NetworkService<T: BeaconChainTypes> {
|
|
/// A reference to the underlying beacon chain.
|
|
beacon_chain: Arc<BeaconChain<T>>,
|
|
/// The underlying libp2p service that drives all the network interactions.
|
|
libp2p: Network<T::EthSpec>,
|
|
/// An attestation and sync committee subnet manager service.
|
|
subnet_service: SubnetService<T>,
|
|
/// The receiver channel for lighthouse to communicate with the network service.
|
|
network_recv: mpsc::UnboundedReceiver<NetworkMessage<T::EthSpec>>,
|
|
/// The receiver channel for lighthouse to send validator subscription requests.
|
|
validator_subscription_recv: mpsc::Receiver<ValidatorSubscriptionMessage>,
|
|
/// The sending channel for the network service to send messages to be routed throughout
|
|
/// lighthouse.
|
|
router_send: mpsc::UnboundedSender<RouterMessage<T::EthSpec>>,
|
|
/// A reference to lighthouse's database to persist the DHT.
|
|
store: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
|
/// A collection of global variables, accessible outside of the network service.
|
|
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
|
/// A delay that expires when a new fork takes place.
|
|
next_fork_update: Pin<Box<OptionFuture<Sleep>>>,
|
|
/// A delay that expires when we need to subscribe to a new fork's topics.
|
|
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
|
|
/// A delay that expires when we need to unsubscribe from old fork topics.
|
|
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
|
|
/// Shutdown beacon node after sync is complete.
|
|
shutdown_after_sync: bool,
|
|
/// Whether metrics are enabled or not.
|
|
metrics_enabled: bool,
|
|
/// A timer for updating various network metrics.
|
|
metrics_update: tokio::time::Interval,
|
|
/// gossipsub_parameter_update timer
|
|
gossipsub_parameter_update: tokio::time::Interval,
|
|
/// Provides fork specific info.
|
|
fork_context: Arc<ForkContext>,
|
|
}
|
|
|
|
impl<T: BeaconChainTypes> NetworkService<T> {
|
|
async fn build(
|
|
beacon_chain: Arc<BeaconChain<T>>,
|
|
config: Arc<NetworkConfig>,
|
|
executor: task_executor::TaskExecutor,
|
|
libp2p_registry: Option<&'_ mut Registry>,
|
|
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
|
|
) -> Result<
|
|
(
|
|
NetworkService<T>,
|
|
Arc<NetworkGlobals<T::EthSpec>>,
|
|
NetworkSenders<T::EthSpec>,
|
|
),
|
|
String,
|
|
> {
|
|
// build the channels for external comms
|
|
let (network_senders, network_receivers) = NetworkSenders::new();
|
|
|
|
#[cfg(feature = "disable-backfill")]
|
|
warn!("Backfill is disabled. DO NOT RUN IN PRODUCTION");
|
|
|
|
if let (true, false, Some(v4)) = (
|
|
config.upnp_enabled,
|
|
config.disable_discovery,
|
|
config.listen_addrs().v4(),
|
|
) {
|
|
let v4 = v4.clone();
|
|
executor.spawn(
|
|
async move {
|
|
info!("UPnP Attempting to initialise routes");
|
|
if let Err(e) = nat::construct_upnp_mappings(v4.addr, v4.disc_port).await {
|
|
info!(error = %e, "Could not UPnP map Discovery port");
|
|
}
|
|
},
|
|
"UPnP",
|
|
);
|
|
}
|
|
|
|
// get a reference to the beacon chain store
|
|
let store = beacon_chain.store.clone();
|
|
|
|
// build the current enr_fork_id for adding to our local ENR
|
|
let enr_fork_id = beacon_chain.enr_fork_id();
|
|
|
|
// keep track of when our fork_id needs to be updated
|
|
let next_fork_update = Box::pin(next_fork_delay(&beacon_chain).into());
|
|
let next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&beacon_chain).into());
|
|
let next_unsubscribe = Box::pin(None.into());
|
|
|
|
let current_slot = beacon_chain
|
|
.slot()
|
|
.unwrap_or(beacon_chain.spec.genesis_slot);
|
|
|
|
// Create a fork context for the given config and genesis validators root
|
|
let fork_context = Arc::new(ForkContext::new::<T::EthSpec>(
|
|
current_slot,
|
|
beacon_chain.genesis_validators_root,
|
|
&beacon_chain.spec,
|
|
));
|
|
|
|
debug!(fork_name = ?fork_context.current_fork(), "Current fork");
|
|
|
|
// construct the libp2p service context
|
|
let service_context = Context {
|
|
config: config.clone(),
|
|
enr_fork_id,
|
|
fork_context: fork_context.clone(),
|
|
chain_spec: beacon_chain.spec.clone(),
|
|
libp2p_registry,
|
|
};
|
|
|
|
// launch libp2p service
|
|
let (mut libp2p, network_globals) = Network::new(
|
|
executor.clone(),
|
|
service_context,
|
|
beacon_chain
|
|
.data_availability_checker
|
|
.custody_context()
|
|
.custody_group_count_at_head(&beacon_chain.spec),
|
|
)
|
|
.await?;
|
|
|
|
// Repopulate the DHT with stored ENR's if discovery is not disabled.
|
|
if !config.disable_discovery {
|
|
let enrs_to_load = load_dht::<T::EthSpec, T::HotStore, T::ColdStore>(store.clone());
|
|
debug!(
|
|
peers = enrs_to_load.len(),
|
|
"Loading peers into the routing table"
|
|
);
|
|
for enr in enrs_to_load {
|
|
libp2p.add_enr(enr.clone());
|
|
}
|
|
}
|
|
|
|
let invalid_block_storage = config
|
|
.invalid_block_storage
|
|
.clone()
|
|
.map(InvalidBlockStorage::Enabled)
|
|
.unwrap_or(InvalidBlockStorage::Disabled);
|
|
|
|
// launch derived network services
|
|
|
|
// router task
|
|
let router_send = Router::spawn(
|
|
beacon_chain.clone(),
|
|
network_globals.clone(),
|
|
network_senders.network_send(),
|
|
executor.clone(),
|
|
invalid_block_storage,
|
|
beacon_processor_send,
|
|
fork_context.clone(),
|
|
)?;
|
|
|
|
// attestation and sync committee subnet service
|
|
let subnet_service = SubnetService::new(
|
|
beacon_chain.clone(),
|
|
network_globals.local_enr().node_id(),
|
|
&config,
|
|
);
|
|
|
|
// create a timer for updating network metrics
|
|
let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL));
|
|
|
|
// create a timer for updating gossipsub parameters
|
|
let gossipsub_parameter_update = tokio::time::interval(Duration::from_secs(60));
|
|
|
|
let NetworkReceivers {
|
|
network_recv,
|
|
validator_subscription_recv,
|
|
} = network_receivers;
|
|
|
|
// create the network service and spawn the task
|
|
let network_service = NetworkService {
|
|
beacon_chain,
|
|
libp2p,
|
|
subnet_service,
|
|
network_recv,
|
|
validator_subscription_recv,
|
|
router_send,
|
|
store,
|
|
network_globals: network_globals.clone(),
|
|
next_fork_update,
|
|
next_fork_subscriptions,
|
|
next_unsubscribe,
|
|
shutdown_after_sync: config.shutdown_after_sync,
|
|
metrics_enabled: config.metrics_enabled,
|
|
metrics_update,
|
|
gossipsub_parameter_update,
|
|
fork_context,
|
|
};
|
|
|
|
Ok((network_service, network_globals, network_senders))
|
|
}
|
|
|
|
#[allow(clippy::type_complexity)]
|
|
pub async fn start(
|
|
beacon_chain: Arc<BeaconChain<T>>,
|
|
config: Arc<NetworkConfig>,
|
|
executor: task_executor::TaskExecutor,
|
|
libp2p_registry: Option<&'_ mut Registry>,
|
|
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
|
|
) -> Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>), String> {
|
|
let (network_service, network_globals, network_senders) = Self::build(
|
|
beacon_chain,
|
|
config,
|
|
executor.clone(),
|
|
libp2p_registry,
|
|
beacon_processor_send,
|
|
)
|
|
.await?;
|
|
|
|
network_service.spawn_service(executor);
|
|
|
|
Ok((network_globals, network_senders))
|
|
}
|
|
|
|
/// Returns the required fork digests that gossipsub needs to subscribe to based on the current slot.
|
|
///
|
|
/// For `current_slot < fork_slot`, this function returns both the pre-fork and post-fork
|
|
/// digests since we should be subscribed to post fork topics before the fork.
|
|
pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> {
|
|
let fork_context = &self.fork_context;
|
|
let spec = &self.beacon_chain.spec;
|
|
let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot);
|
|
let current_fork = fork_context.current_fork();
|
|
|
|
let mut result = vec![fork_context
|
|
.to_context_bytes(current_fork)
|
|
.unwrap_or_else(|| {
|
|
panic!(
|
|
"{} fork bytes should exist as it's initialized in ForkContext",
|
|
current_fork
|
|
)
|
|
})];
|
|
|
|
if let Some((next_fork, fork_epoch)) = spec.next_fork_epoch::<T::EthSpec>(current_slot) {
|
|
if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS))
|
|
>= fork_epoch.start_slot(T::EthSpec::slots_per_epoch())
|
|
{
|
|
let next_fork_context_bytes =
|
|
fork_context.to_context_bytes(next_fork).unwrap_or_else(|| {
|
|
panic!(
|
|
"context bytes should exist as spec.next_fork_epoch({}) returned Some({})",
|
|
current_slot, next_fork
|
|
)
|
|
});
|
|
result.push(next_fork_context_bytes);
|
|
}
|
|
}
|
|
|
|
result
|
|
}
|
|
|
|
fn send_to_router(&mut self, msg: RouterMessage<T::EthSpec>) {
|
|
if let Err(mpsc::error::SendError(msg)) = self.router_send.send(msg) {
|
|
debug!(?msg, "Failed to send msg to router");
|
|
}
|
|
}
|
|
|
|
fn spawn_service(mut self, executor: task_executor::TaskExecutor) {
|
|
let mut shutdown_sender = executor.shutdown_sender();
|
|
|
|
// spawn on the current executor
|
|
let service_fut = async move {
|
|
loop {
|
|
tokio::select! {
|
|
_ = self.metrics_update.tick(), if self.metrics_enabled => {
|
|
// update various network metrics
|
|
metrics::update_gossip_metrics::<T::EthSpec>(
|
|
self.libp2p.gossipsub(),
|
|
&self.network_globals,
|
|
);
|
|
// update sync metrics
|
|
metrics::update_sync_metrics(&self.network_globals);
|
|
}
|
|
|
|
_ = self.gossipsub_parameter_update.tick() => self.update_gossipsub_parameters(),
|
|
|
|
// handle a message sent to the network
|
|
Some(msg) = self.network_recv.recv() => self.on_network_msg(msg, &mut shutdown_sender).await,
|
|
|
|
// handle a message from a validator requesting a subscription to a subnet
|
|
Some(msg) = self.validator_subscription_recv.recv() => self.on_validator_subscription_msg(msg).await,
|
|
|
|
// process any subnet service events
|
|
Some(msg) = self.subnet_service.next() => self.on_subnet_service_msg(msg),
|
|
|
|
event = self.libp2p.next_event() => self.on_libp2p_event(event, &mut shutdown_sender).await,
|
|
|
|
Some(_) = &mut self.next_fork_update => self.update_next_fork(),
|
|
|
|
Some(_) = &mut self.next_unsubscribe => {
|
|
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
|
|
self.libp2p.unsubscribe_from_fork_topics_except(new_enr_fork_id.fork_digest);
|
|
info!("Unsubscribed from old fork topics");
|
|
self.next_unsubscribe = Box::pin(None.into());
|
|
}
|
|
|
|
Some(_) = &mut self.next_fork_subscriptions => {
|
|
if let Some((fork_name, _)) = self.beacon_chain.duration_to_next_fork() {
|
|
let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name);
|
|
let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root);
|
|
info!("Subscribing to new fork topics");
|
|
self.libp2p.subscribe_new_fork_topics(fork_name, fork_digest);
|
|
self.next_fork_subscriptions = Box::pin(None.into());
|
|
}
|
|
else {
|
|
error!( "Fork subscription scheduled but no fork scheduled");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}.instrument(info_span!("", service = "network"));
|
|
executor.spawn(service_fut, "network");
|
|
}
|
|
|
|
/// Handle an event received from the network.
|
|
async fn on_libp2p_event(
|
|
&mut self,
|
|
ev: NetworkEvent<T::EthSpec>,
|
|
shutdown_sender: &mut Sender<ShutdownReason>,
|
|
) {
|
|
match ev {
|
|
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
|
self.send_to_router(RouterMessage::StatusPeer(peer_id));
|
|
}
|
|
NetworkEvent::PeerConnectedIncoming(_) => {
|
|
// No action required for this event.
|
|
}
|
|
NetworkEvent::PeerDisconnected(peer_id) => {
|
|
self.send_to_router(RouterMessage::PeerDisconnected(peer_id));
|
|
}
|
|
NetworkEvent::PeerUpdatedCustodyGroupCount(peer_id) => {
|
|
self.send_to_router(RouterMessage::PeerUpdatedCustodyGroupCount(peer_id));
|
|
}
|
|
NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
} => {
|
|
self.send_to_router(RouterMessage::RPCRequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
});
|
|
}
|
|
NetworkEvent::ResponseReceived {
|
|
peer_id,
|
|
app_request_id,
|
|
response,
|
|
} => {
|
|
self.send_to_router(RouterMessage::RPCResponseReceived {
|
|
peer_id,
|
|
app_request_id,
|
|
response,
|
|
});
|
|
}
|
|
NetworkEvent::RPCFailed {
|
|
app_request_id,
|
|
peer_id,
|
|
error,
|
|
} => {
|
|
self.send_to_router(RouterMessage::RPCFailed {
|
|
peer_id,
|
|
app_request_id,
|
|
error,
|
|
});
|
|
}
|
|
NetworkEvent::StatusPeer(peer_id) => {
|
|
self.send_to_router(RouterMessage::StatusPeer(peer_id));
|
|
}
|
|
NetworkEvent::PubsubMessage {
|
|
id,
|
|
source,
|
|
message,
|
|
..
|
|
} => {
|
|
match message {
|
|
// attestation information gets processed in the attestation service
|
|
PubsubMessage::Attestation(ref subnet_and_attestation) => {
|
|
let subnet_id = subnet_and_attestation.0;
|
|
let attestation = &subnet_and_attestation.1;
|
|
// checks if we have an aggregator for the slot. If so, we should process
|
|
// the attestation, else we just just propagate the Attestation.
|
|
let should_process = self.subnet_service.should_process_attestation(
|
|
Subnet::Attestation(subnet_id),
|
|
&attestation.data,
|
|
);
|
|
self.send_to_router(RouterMessage::PubsubMessage(
|
|
id,
|
|
source,
|
|
message,
|
|
should_process,
|
|
));
|
|
}
|
|
_ => {
|
|
// all else is sent to the router
|
|
self.send_to_router(RouterMessage::PubsubMessage(
|
|
id, source, message, true,
|
|
));
|
|
}
|
|
}
|
|
}
|
|
NetworkEvent::NewListenAddr(multiaddr) => {
|
|
self.network_globals
|
|
.listen_multiaddrs
|
|
.write()
|
|
.push(multiaddr);
|
|
}
|
|
NetworkEvent::ZeroListeners => {
|
|
let _ = shutdown_sender
|
|
.send(ShutdownReason::Failure(
|
|
"All listeners are closed. Unable to listen",
|
|
))
|
|
.await
|
|
.map_err(|e| {
|
|
warn!(
|
|
error = %e,
|
|
"failed to send a shutdown signal"
|
|
)
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Handle a message sent to the network service.
|
|
async fn on_network_msg(
|
|
&mut self,
|
|
msg: NetworkMessage<T::EthSpec>,
|
|
shutdown_sender: &mut Sender<ShutdownReason>,
|
|
) {
|
|
metrics::inc_counter_vec(&metrics::NETWORK_RECEIVE_EVENTS, &[(&msg).into()]);
|
|
let _timer = metrics::start_timer_vec(&metrics::NETWORK_RECEIVE_TIMES, &[(&msg).into()]);
|
|
|
|
match msg {
|
|
NetworkMessage::SendRequest {
|
|
peer_id,
|
|
request,
|
|
app_request_id,
|
|
} => {
|
|
if let Err((app_request_id, error)) =
|
|
self.libp2p.send_request(peer_id, app_request_id, request)
|
|
{
|
|
self.send_to_router(RouterMessage::RPCFailed {
|
|
peer_id,
|
|
app_request_id,
|
|
error,
|
|
});
|
|
}
|
|
}
|
|
NetworkMessage::SendResponse {
|
|
peer_id,
|
|
inbound_request_id,
|
|
response,
|
|
} => {
|
|
self.libp2p
|
|
.send_response(peer_id, inbound_request_id, response);
|
|
}
|
|
NetworkMessage::SendErrorResponse {
|
|
peer_id,
|
|
error,
|
|
inbound_request_id,
|
|
reason,
|
|
} => {
|
|
self.libp2p
|
|
.send_error_response(peer_id, inbound_request_id, error, reason);
|
|
}
|
|
NetworkMessage::ValidationResult {
|
|
propagation_source,
|
|
message_id,
|
|
validation_result,
|
|
} => {
|
|
trace!( propagation_peer = ?propagation_source,
|
|
%message_id,
|
|
?validation_result, "Propagating gossipsub message"
|
|
);
|
|
self.libp2p.report_message_validation_result(
|
|
&propagation_source,
|
|
message_id,
|
|
validation_result,
|
|
);
|
|
}
|
|
NetworkMessage::Publish { messages } => {
|
|
let mut topic_kinds = Vec::new();
|
|
for message in &messages {
|
|
if !topic_kinds.contains(&message.kind()) {
|
|
topic_kinds.push(message.kind());
|
|
}
|
|
}
|
|
debug!(
|
|
count = messages.len(),
|
|
topics = ?topic_kinds,
|
|
"Sending pubsub messages"
|
|
);
|
|
self.libp2p.publish(messages);
|
|
}
|
|
NetworkMessage::ReportPeer {
|
|
peer_id,
|
|
action,
|
|
source,
|
|
msg,
|
|
} => self.libp2p.report_peer(&peer_id, action, source, msg),
|
|
NetworkMessage::GoodbyePeer {
|
|
peer_id,
|
|
reason,
|
|
source,
|
|
} => self.libp2p.goodbye_peer(&peer_id, reason, source),
|
|
NetworkMessage::ConnectTrustedPeer(enr) => {
|
|
self.libp2p.dial_trusted_peer(enr);
|
|
}
|
|
NetworkMessage::DisconnectTrustedPeer(enr) => {
|
|
self.libp2p.remove_trusted_peer(enr);
|
|
}
|
|
NetworkMessage::SubscribeCoreTopics => {
|
|
if self.subscribed_core_topics() {
|
|
return;
|
|
}
|
|
|
|
if self.shutdown_after_sync {
|
|
if let Err(e) = shutdown_sender
|
|
.send(ShutdownReason::Success(
|
|
"Beacon node completed sync. \
|
|
Shutting down as --shutdown-after-sync flag is enabled",
|
|
))
|
|
.await
|
|
{
|
|
warn!(
|
|
error = %e,
|
|
"failed to send a shutdown signal"
|
|
)
|
|
}
|
|
return;
|
|
}
|
|
|
|
let mut subscribed_topics: Vec<GossipTopic> = vec![];
|
|
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
|
|
self.fork_context.current_fork(),
|
|
&self.network_globals.as_topic_config(),
|
|
&self.fork_context.spec,
|
|
) {
|
|
for fork_digest in self.required_gossip_fork_digests() {
|
|
let topic = GossipTopic::new(
|
|
topic_kind.clone(),
|
|
GossipEncoding::default(),
|
|
fork_digest,
|
|
);
|
|
if self.libp2p.subscribe(topic.clone()) {
|
|
subscribed_topics.push(topic);
|
|
} else {
|
|
warn!(%topic, "Could not subscribe to topic");
|
|
}
|
|
}
|
|
}
|
|
|
|
// If we are to subscribe to all subnets we do it here
|
|
if self.network_globals.config.subscribe_all_subnets {
|
|
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
|
|
let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
|
|
// Update the ENR bitfield
|
|
self.libp2p.update_enr_subnet(subnet, true);
|
|
}
|
|
let subnet_max = <<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64();
|
|
for subnet_id in 0..subnet_max {
|
|
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
|
|
// Update the ENR bitfield
|
|
self.libp2p.update_enr_subnet(subnet, true);
|
|
}
|
|
}
|
|
|
|
if !subscribed_topics.is_empty() {
|
|
info!(
|
|
topics = ?subscribed_topics.into_iter().map(|topic| format!("{}", topic)).collect::<Vec<_>>(),
|
|
"Subscribed to topics"
|
|
);
|
|
}
|
|
}
|
|
NetworkMessage::CustodyCountChanged {
|
|
new_custody_group_count,
|
|
sampling_count,
|
|
} => {
|
|
// subscribe to `sampling_count` subnets
|
|
self.libp2p
|
|
.subscribe_new_data_column_subnets(sampling_count);
|
|
if self
|
|
.network_globals
|
|
.config
|
|
.advertise_false_custody_group_count
|
|
.is_none()
|
|
{
|
|
self.libp2p.update_enr_cgc(new_custody_group_count);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Handle a message sent to the network service.
|
|
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
|
|
match msg {
|
|
ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions } => {
|
|
let subscriptions = subscriptions.into_iter().map(Subscription::Attestation);
|
|
self.subnet_service.validator_subscriptions(subscriptions)
|
|
}
|
|
ValidatorSubscriptionMessage::SyncCommitteeSubscribe { subscriptions } => {
|
|
let subscriptions = subscriptions.into_iter().map(Subscription::SyncCommittee);
|
|
self.subnet_service.validator_subscriptions(subscriptions)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn update_gossipsub_parameters(&mut self) {
|
|
if let Ok(slot) = self.beacon_chain.slot() {
|
|
let active_validators_opt = self
|
|
.beacon_chain
|
|
.canonical_head
|
|
.cached_head()
|
|
.active_validator_count();
|
|
if let Some(active_validators) = active_validators_opt {
|
|
if self
|
|
.libp2p
|
|
.update_gossipsub_parameters(active_validators, slot)
|
|
.is_err()
|
|
{
|
|
error!(active_validators, "Failed to update gossipsub parameters");
|
|
}
|
|
} else {
|
|
// This scenario will only happen if the caches on the cached canonical head aren't
|
|
// built. That should never be the case.
|
|
error!(
|
|
info = "please report this bug",
|
|
"Active validator count unavailable"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) {
|
|
match msg {
|
|
SubnetServiceMessage::Subscribe(subnet) => {
|
|
for fork_digest in self.required_gossip_fork_digests() {
|
|
let topic =
|
|
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
|
|
self.libp2p.subscribe(topic);
|
|
}
|
|
}
|
|
SubnetServiceMessage::Unsubscribe(subnet) => {
|
|
for fork_digest in self.required_gossip_fork_digests() {
|
|
let topic =
|
|
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
|
|
self.libp2p.unsubscribe(topic);
|
|
}
|
|
}
|
|
SubnetServiceMessage::EnrAdd(subnet) => {
|
|
self.libp2p.update_enr_subnet(subnet, true);
|
|
}
|
|
SubnetServiceMessage::EnrRemove(sync_subnet_id) => {
|
|
self.libp2p
|
|
.update_enr_subnet(Subnet::SyncCommittee(sync_subnet_id), false);
|
|
}
|
|
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
|
|
self.libp2p.discover_subnet_peers(subnets_to_discover);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn update_next_fork(&mut self) {
|
|
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
|
|
let new_fork_digest = new_enr_fork_id.fork_digest;
|
|
|
|
let fork_context = &self.fork_context;
|
|
if let Some(new_fork_name) = fork_context.from_context_bytes(new_fork_digest) {
|
|
info!(
|
|
old_fork = ?fork_context.current_fork(),
|
|
new_fork = ?new_fork_name,
|
|
"Transitioned to new fork"
|
|
);
|
|
fork_context.update_current_fork(*new_fork_name);
|
|
|
|
self.libp2p.update_fork_version(new_enr_fork_id);
|
|
// Reinitialize the next_fork_update
|
|
self.next_fork_update = Box::pin(next_fork_delay(&self.beacon_chain).into());
|
|
|
|
// Set the next_unsubscribe delay.
|
|
let epoch_duration =
|
|
self.beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
|
|
let unsubscribe_delay = Duration::from_secs(UNSUBSCRIBE_DELAY_EPOCHS * epoch_duration);
|
|
|
|
// Update the `next_fork_subscriptions` timer if the next fork is known.
|
|
self.next_fork_subscriptions =
|
|
Box::pin(next_fork_subscriptions_delay(&self.beacon_chain).into());
|
|
self.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into());
|
|
info!(
|
|
remaining_epochs = UNSUBSCRIBE_DELAY_EPOCHS,
|
|
"Network will unsubscribe from old fork gossip topics in a few epochs"
|
|
);
|
|
|
|
// Remove topic weight from old fork topics to prevent peers that left on the mesh on
|
|
// old topics from being penalized for not sending us messages.
|
|
self.libp2p.remove_topic_weight_except(new_fork_digest);
|
|
} else {
|
|
crit!(new_fork_id = ?new_enr_fork_id, "Unknown new enr fork id");
|
|
}
|
|
}
|
|
|
|
fn subscribed_core_topics(&self) -> bool {
|
|
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
|
|
self.fork_context.current_fork(),
|
|
&self.network_globals.as_topic_config(),
|
|
&self.fork_context.spec,
|
|
);
|
|
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
|
|
let subscriptions = self.network_globals.gossipsub_subscriptions.read();
|
|
let subscribed_topics: HashSet<&GossipKind> =
|
|
subscriptions.iter().map(|topic| topic.kind()).collect();
|
|
|
|
core_topics.is_subset(&subscribed_topics)
|
|
}
|
|
}
|
|
|
|
/// Returns a `Sleep` that triggers after the next change in the beacon chain fork version.
|
|
/// If there is no scheduled fork, `None` is returned.
|
|
fn next_fork_delay<T: BeaconChainTypes>(
|
|
beacon_chain: &BeaconChain<T>,
|
|
) -> Option<tokio::time::Sleep> {
|
|
beacon_chain
|
|
.duration_to_next_fork()
|
|
.map(|(_, until_fork)| tokio::time::sleep(until_fork))
|
|
}
|
|
|
|
/// Returns a `Sleep` that triggers `SUBSCRIBE_DELAY_SLOTS` before the next fork.
|
|
/// Returns `None` if there are no scheduled forks or we are already past `current_slot + SUBSCRIBE_DELAY_SLOTS > fork_slot`.
|
|
fn next_fork_subscriptions_delay<T: BeaconChainTypes>(
|
|
beacon_chain: &BeaconChain<T>,
|
|
) -> Option<tokio::time::Sleep> {
|
|
if let Some((_, duration_to_fork)) = beacon_chain.duration_to_next_fork() {
|
|
let duration_to_subscription = duration_to_fork.saturating_sub(Duration::from_secs(
|
|
beacon_chain.spec.seconds_per_slot * SUBSCRIBE_DELAY_SLOTS,
|
|
));
|
|
if !duration_to_subscription.is_zero() {
|
|
return Some(tokio::time::sleep(duration_to_subscription));
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
impl<T: BeaconChainTypes> Drop for NetworkService<T> {
|
|
fn drop(&mut self) {
|
|
// network thread is terminating
|
|
let enrs = self.libp2p.enr_entries();
|
|
debug!(number_of_peers = enrs.len(), "Persisting DHT to store");
|
|
if let Err(e) = clear_dht::<T::EthSpec, T::HotStore, T::ColdStore>(self.store.clone()) {
|
|
error!(error = ?e, "Failed to clear old DHT entries");
|
|
}
|
|
// Still try to update new entries
|
|
match persist_dht::<T::EthSpec, T::HotStore, T::ColdStore>(self.store.clone(), enrs) {
|
|
Err(e) => error!(
|
|
error = ?e,
|
|
"Failed to persist DHT on drop"
|
|
),
|
|
Ok(_) => info!("Saved DHT state"),
|
|
}
|
|
info!("Network service shutdown");
|
|
}
|
|
}
|