mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-21 13:54:44 +00:00
Compute recent lightclient updates (#4969)
* Compute recent lightclient updates * Review PR * Merge remote-tracking branch 'upstream/unstable' into lc-prod-recent-updates * Review PR * consistent naming * add metrics * revert dropping reprocessing queue * Update light client optimistic update re-processing logic. (#7) * Add light client server simulator tests. Co-authored by @dapplion. * Merge branch 'unstable' into fork/dapplion/lc-prod-recent-updates * Fix lint * Enable light client server in simulator test. * Fix test for light client optimistic updates and finality updates.
This commit is contained in:
@@ -1,4 +1,7 @@
|
||||
use crate::address_change_broadcast::broadcast_address_changes_at_capella;
|
||||
use crate::compute_light_client_updates::{
|
||||
compute_light_client_updates, LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY,
|
||||
};
|
||||
use crate::config::{ClientGenesis, Config as ClientConfig};
|
||||
use crate::notifier::spawn_notifier;
|
||||
use crate::Client;
|
||||
@@ -7,6 +10,7 @@ use beacon_chain::data_availability_checker::start_availability_cache_maintenanc
|
||||
use beacon_chain::otb_verification_service::start_otb_verification_service;
|
||||
use beacon_chain::proposer_prep_service::start_proposer_prep_service;
|
||||
use beacon_chain::schema_change::migrate_schema;
|
||||
use beacon_chain::LightClientProducerEvent;
|
||||
use beacon_chain::{
|
||||
builder::{BeaconChainBuilder, Witness},
|
||||
eth1_chain::{CachingEth1Backend, Eth1Chain},
|
||||
@@ -24,6 +28,7 @@ use eth2::{
|
||||
BeaconNodeHttpClient, Error as ApiError, Timeouts,
|
||||
};
|
||||
use execution_layer::ExecutionLayer;
|
||||
use futures::channel::mpsc::Receiver;
|
||||
use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH};
|
||||
use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals};
|
||||
use monitoring_api::{MonitoringHttpClient, ProcessType};
|
||||
@@ -83,6 +88,7 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
|
||||
slasher: Option<Arc<Slasher<T::EthSpec>>>,
|
||||
beacon_processor_config: Option<BeaconProcessorConfig>,
|
||||
beacon_processor_channels: Option<BeaconProcessorChannels<T::EthSpec>>,
|
||||
light_client_server_rv: Option<Receiver<LightClientProducerEvent<T::EthSpec>>>,
|
||||
eth_spec_instance: T::EthSpec,
|
||||
}
|
||||
|
||||
@@ -118,6 +124,7 @@ where
|
||||
eth_spec_instance,
|
||||
beacon_processor_config: None,
|
||||
beacon_processor_channels: None,
|
||||
light_client_server_rv: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,6 +213,16 @@ where
|
||||
builder
|
||||
};
|
||||
|
||||
let builder = if config.network.enable_light_client_server {
|
||||
let (tx, rv) = futures::channel::mpsc::channel::<LightClientProducerEvent<TEthSpec>>(
|
||||
LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY,
|
||||
);
|
||||
self.light_client_server_rv = Some(rv);
|
||||
builder.light_client_server_tx(tx)
|
||||
} else {
|
||||
builder
|
||||
};
|
||||
|
||||
let chain_exists = builder.store_contains_beacon_chain().unwrap_or(false);
|
||||
|
||||
// If the client is expect to resume but there's no beacon chain in the database,
|
||||
@@ -797,7 +814,7 @@ where
|
||||
}
|
||||
.spawn_manager(
|
||||
beacon_processor_channels.beacon_processor_rx,
|
||||
beacon_processor_channels.work_reprocessing_tx,
|
||||
beacon_processor_channels.work_reprocessing_tx.clone(),
|
||||
beacon_processor_channels.work_reprocessing_rx,
|
||||
None,
|
||||
beacon_chain.slot_clock.clone(),
|
||||
@@ -860,7 +877,7 @@ where
|
||||
}
|
||||
|
||||
// Spawn a service to publish BLS to execution changes at the Capella fork.
|
||||
if let Some(network_senders) = self.network_senders {
|
||||
if let Some(network_senders) = self.network_senders.clone() {
|
||||
let inner_chain = beacon_chain.clone();
|
||||
let broadcast_context =
|
||||
runtime_context.service_context("addr_bcast".to_string());
|
||||
@@ -879,6 +896,26 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn service to publish light_client updates at some interval into the slot.
|
||||
if let Some(light_client_server_rv) = self.light_client_server_rv {
|
||||
let inner_chain = beacon_chain.clone();
|
||||
let light_client_update_context =
|
||||
runtime_context.service_context("lc_update".to_string());
|
||||
let log = light_client_update_context.log().clone();
|
||||
light_client_update_context.executor.spawn(
|
||||
async move {
|
||||
compute_light_client_updates(
|
||||
&inner_chain,
|
||||
light_client_server_rv,
|
||||
beacon_processor_channels.work_reprocessing_tx,
|
||||
&log,
|
||||
)
|
||||
.await
|
||||
},
|
||||
"lc_update",
|
||||
);
|
||||
}
|
||||
|
||||
start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone());
|
||||
start_otb_verification_service(runtime_context.executor.clone(), beacon_chain.clone());
|
||||
start_availability_cache_maintenance_service(
|
||||
|
||||
39
beacon_node/client/src/compute_light_client_updates.rs
Normal file
39
beacon_node/client/src/compute_light_client_updates.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent};
|
||||
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
|
||||
use futures::channel::mpsc::Receiver;
|
||||
use futures::StreamExt;
|
||||
use slog::{error, Logger};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
// Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent
|
||||
// updates it is okay to drop some events in case of overloading. In normal network conditions
|
||||
// there's one event emitted per block at most every 12 seconds, while consuming the event should
|
||||
// take a few milliseconds. 32 is a small enough arbitrary number.
|
||||
pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32;
|
||||
|
||||
pub async fn compute_light_client_updates<T: BeaconChainTypes>(
|
||||
chain: &BeaconChain<T>,
|
||||
mut light_client_server_rv: Receiver<LightClientProducerEvent<T::EthSpec>>,
|
||||
reprocess_tx: Sender<ReprocessQueueMessage>,
|
||||
log: &Logger,
|
||||
) {
|
||||
// Should only receive events for recent blocks, import_block filters by blocks close to clock.
|
||||
//
|
||||
// Intents to process SyncAggregates of all recent blocks sequentially, without skipping.
|
||||
// Uses a bounded receiver, so may drop some SyncAggregates if very overloaded. This is okay
|
||||
// since only the most recent updates have value.
|
||||
while let Some(event) = light_client_server_rv.next().await {
|
||||
let parent_root = event.0;
|
||||
|
||||
chain
|
||||
.recompute_and_cache_light_client_updates(event)
|
||||
.unwrap_or_else(|e| {
|
||||
error!(log, "error computing light_client updates {:?}", e);
|
||||
});
|
||||
|
||||
let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root };
|
||||
if reprocess_tx.try_send(msg).is_err() {
|
||||
error!(log, "Failed to inform light client update"; "parent_root" => %parent_root)
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
extern crate slog;
|
||||
|
||||
mod address_change_broadcast;
|
||||
mod compute_light_client_updates;
|
||||
pub mod config;
|
||||
mod metrics;
|
||||
mod notifier;
|
||||
|
||||
Reference in New Issue
Block a user