Start wiring up beacon processor

This commit is contained in:
Paul Hauner
2023-07-03 17:00:24 +10:00
parent 238837bc91
commit e9112875ac
5 changed files with 31 additions and 1 deletions

View File

@@ -42,6 +42,7 @@ operation_pool = { path = "../operation_pool" }
sensitive_url = { path = "../../common/sensitive_url" }
unused_port = {path = "../../common/unused_port"}
store = { path = "../store" }
beacon_processor = { path = "../beacon_processor" }
[dev-dependencies]
environment = { path = "../../lighthouse/environment" }

View File

@@ -28,6 +28,7 @@ use beacon_chain::{
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
BeaconChainTypes, ProduceBlockVerification, WhenSlotSkipped,
};
use beacon_processor::BeaconProcessorSend;
pub use block_id::BlockId;
use directory::DEFAULT_ROOT_DIR;
use eth2::types::{
@@ -110,6 +111,7 @@ pub struct Context<T: BeaconChainTypes> {
pub chain: Option<Arc<BeaconChain<T>>>,
pub network_senders: Option<NetworkSenders<T::EthSpec>>,
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>,
pub eth1_service: Option<eth1::Service>,
pub sse_logging_components: Option<SSELoggingComponents>,
pub log: Logger,

View File

@@ -5,6 +5,9 @@ use beacon_chain::{
},
BeaconChain, BeaconChainTypes,
};
use beacon_processor::{
BeaconProcessor, BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN,
};
use directory::DEFAULT_ROOT_DIR;
use eth2::{BeaconNodeHttpClient, Timeouts};
use lighthouse_network::{
@@ -29,7 +32,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use store::MemoryStore;
use tokio::sync::oneshot;
use task_executor::test_utils::TestRuntime;
use tokio::sync::{mpsc, oneshot};
use types::{ChainSpec, EthSpec};
pub const TCP_PORT: u16 = 42;
@@ -183,6 +187,25 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
let eth1_service =
eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap();
let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN);
let beacon_processor_send = BeaconProcessorSend(beacon_processor_tx);
let (work_reprocessing_tx, work_reprocessing_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
BeaconProcessor {
network_globals: network_globals.clone(),
executor: TestRuntime::default().task_executor.clone(),
max_workers: 1, // Single-threaded beacon processor.
current_workers: 0,
enable_backfill_rate_limiting: chain.config.enable_backfill_rate_limiting,
log: log.clone(),
}
.spawn_manager(
beacon_processor_rx,
work_reprocessing_tx,
work_reprocessing_rx,
None,
chain.slot_clock.clone(),
);
let ctx = Arc::new(Context {
config: Config {
enabled: true,
@@ -197,6 +220,7 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
chain: Some(chain),
network_senders: Some(network_senders),
network_globals: Some(network_globals),
beacon_processor_send: Some(beacon_processor_send),
eth1_service: Some(eth1_service),
sse_logging_components: None,
log,