From e9112875acb5a841e72eb1d96cd7b8183b7cfee3 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 3 Jul 2023 17:00:24 +1000 Subject: [PATCH] Start wiring up beacon processor --- Cargo.lock | 1 + beacon_node/client/src/builder.rs | 2 ++ beacon_node/http_api/Cargo.toml | 1 + beacon_node/http_api/src/lib.rs | 2 ++ beacon_node/http_api/src/test_utils.rs | 26 +++++++++++++++++++++++++- 5 files changed, 31 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 532f7ff204..ba82b20024 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3514,6 +3514,7 @@ name = "http_api" version = "0.1.0" dependencies = [ "beacon_chain", + "beacon_processor", "bs58", "directory", "environment", diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index f3ed7f65a9..cb652884e1 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -493,6 +493,7 @@ where chain: None, network_senders: None, network_globals: None, + beacon_processor_send: None, eth1_service: Some(genesis_service.eth1_service.clone()), log: context.log().clone(), sse_logging_components: runtime_context.sse_logging_components.clone(), @@ -718,6 +719,7 @@ where network_senders: self.network_senders.clone(), network_globals: self.network_globals.clone(), eth1_service: self.eth1_service.clone(), + beacon_processor_send: Some(self.beacon_processor_send.clone()), sse_logging_components: runtime_context.sse_logging_components.clone(), log: log.clone(), }); diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 2b117b26ce..b7c6f9e94b 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -42,6 +42,7 @@ operation_pool = { path = "../operation_pool" } sensitive_url = { path = "../../common/sensitive_url" } unused_port = {path = "../../common/unused_port"} store = { path = "../store" } +beacon_processor = { path = "../beacon_processor" } [dev-dependencies] environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 27bcc4d8a1..3fff4e510d 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -28,6 +28,7 @@ use beacon_chain::{ validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, ProduceBlockVerification, WhenSlotSkipped, }; +use beacon_processor::BeaconProcessorSend; pub use block_id::BlockId; use directory::DEFAULT_ROOT_DIR; use eth2::types::{ @@ -110,6 +111,7 @@ pub struct Context { pub chain: Option>>, pub network_senders: Option>, pub network_globals: Option>>, + pub beacon_processor_send: Option>, pub eth1_service: Option, pub sse_logging_components: Option, pub log: Logger, diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 9880a8ca61..d2770b5608 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -5,6 +5,9 @@ use beacon_chain::{ }, BeaconChain, BeaconChainTypes, }; +use beacon_processor::{ + BeaconProcessor, BeaconProcessorSend, MAX_SCHEDULED_WORK_QUEUE_LEN, MAX_WORK_EVENT_QUEUE_LEN, +}; use directory::DEFAULT_ROOT_DIR; use eth2::{BeaconNodeHttpClient, Timeouts}; use lighthouse_network::{ @@ -29,7 +32,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::Duration; use store::MemoryStore; -use tokio::sync::oneshot; +use task_executor::test_utils::TestRuntime; +use tokio::sync::{mpsc, oneshot}; use types::{ChainSpec, EthSpec}; pub const TCP_PORT: u16 = 42; @@ -183,6 +187,25 @@ pub async fn create_api_server_on_port( let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap(); + let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); + let beacon_processor_send = BeaconProcessorSend(beacon_processor_tx); + let (work_reprocessing_tx, work_reprocessing_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); + BeaconProcessor { + network_globals: network_globals.clone(), + executor: TestRuntime::default().task_executor.clone(), + max_workers: 1, // Single-threaded beacon processor. + current_workers: 0, + enable_backfill_rate_limiting: chain.config.enable_backfill_rate_limiting, + log: log.clone(), + } + .spawn_manager( + beacon_processor_rx, + work_reprocessing_tx, + work_reprocessing_rx, + None, + chain.slot_clock.clone(), + ); + let ctx = Arc::new(Context { config: Config { enabled: true, @@ -197,6 +220,7 @@ pub async fn create_api_server_on_port( chain: Some(chain), network_senders: Some(network_senders), network_globals: Some(network_globals), + beacon_processor_send: Some(beacon_processor_send), eth1_service: Some(eth1_service), sse_logging_components: None, log,