Begin threading execution layer into BeaconChain

This commit is contained in:
Paul Hauner
2021-09-25 09:53:17 +10:00
parent 74a25cebdb
commit 4fe318c2e5
7 changed files with 34 additions and 25 deletions

3
Cargo.lock generated
View File

@@ -457,6 +457,7 @@ dependencies = [
"eth2_ssz", "eth2_ssz",
"eth2_ssz_derive", "eth2_ssz_derive",
"eth2_ssz_types", "eth2_ssz_types",
"execution_layer",
"exit-future", "exit-future",
"fork_choice", "fork_choice",
"futures", "futures",
@@ -2076,10 +2077,10 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"slog", "slog",
"task_executor",
"tokio", "tokio",
"types", "types",
"warp", "warp",
"warp_utils",
] ]
[[package]] [[package]]

View File

@@ -63,3 +63,4 @@ exit-future = "0.2.0"
slasher = { path = "../../slasher" } slasher = { path = "../../slasher" }
eth2 = { path = "../../common/eth2" } eth2 = { path = "../../common/eth2" }
strum = { version = "0.21.0", features = ["derive"] } strum = { version = "0.21.0", features = ["derive"] }
execution_layer = { path = "../execution_layer" }

View File

@@ -46,6 +46,7 @@ use crate::BeaconForkChoiceStore;
use crate::BeaconSnapshot; use crate::BeaconSnapshot;
use crate::{metrics, BeaconChainError}; use crate::{metrics, BeaconChainError};
use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SyncDuty}; use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SyncDuty};
use execution_layer::ExecutionLayer;
use fork_choice::ForkChoice; use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender; use futures::channel::mpsc::Sender;
use itertools::process_results; use itertools::process_results;
@@ -272,6 +273,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
Mutex<ObservedOperations<AttesterSlashing<T::EthSpec>, T::EthSpec>>, Mutex<ObservedOperations<AttesterSlashing<T::EthSpec>, T::EthSpec>>,
/// Provides information from the Ethereum 1 (PoW) chain. /// Provides information from the Ethereum 1 (PoW) chain.
pub eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec>>, pub eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec>>,
/// Interfaces with the execution client.
pub execution_layer: Option<ExecutionLayer>,
/// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received. /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received.
pub(crate) canonical_head: TimeoutRwLock<BeaconSnapshot<T::EthSpec>>, pub(crate) canonical_head: TimeoutRwLock<BeaconSnapshot<T::EthSpec>>,
/// The root of the genesis block. /// The root of the genesis block.

View File

@@ -15,6 +15,7 @@ use crate::{
Eth1ChainBackend, ServerSentEventHandler, Eth1ChainBackend, ServerSentEventHandler,
}; };
use eth1::Config as Eth1Config; use eth1::Config as Eth1Config;
use execution_layer::ExecutionLayer;
use fork_choice::ForkChoice; use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender; use futures::channel::mpsc::Sender;
use operation_pool::{OperationPool, PersistedOperationPool}; use operation_pool::{OperationPool, PersistedOperationPool};
@@ -75,6 +76,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
>, >,
op_pool: Option<OperationPool<T::EthSpec>>, op_pool: Option<OperationPool<T::EthSpec>>,
eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec>>, eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec>>,
execution_layer: Option<ExecutionLayer>,
event_handler: Option<ServerSentEventHandler<T::EthSpec>>, event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
slot_clock: Option<T::SlotClock>, slot_clock: Option<T::SlotClock>,
shutdown_sender: Option<Sender<ShutdownReason>>, shutdown_sender: Option<Sender<ShutdownReason>>,
@@ -112,6 +114,7 @@ where
fork_choice: None, fork_choice: None,
op_pool: None, op_pool: None,
eth1_chain: None, eth1_chain: None,
execution_layer: None,
event_handler: None, event_handler: None,
slot_clock: None, slot_clock: None,
shutdown_sender: None, shutdown_sender: None,
@@ -470,6 +473,12 @@ where
self self
} }
/// Sets the `BeaconChain` execution layer.
pub fn execution_layer(mut self, execution_layer: Option<ExecutionLayer>) -> Self {
self.execution_layer = execution_layer;
self
}
/// Sets the `BeaconChain` event handler backend. /// Sets the `BeaconChain` event handler backend.
/// ///
/// For example, provide `ServerSentEventHandler` as a `handler`. /// For example, provide `ServerSentEventHandler` as a `handler`.
@@ -711,6 +720,7 @@ where
observed_proposer_slashings: <_>::default(), observed_proposer_slashings: <_>::default(),
observed_attester_slashings: <_>::default(), observed_attester_slashings: <_>::default(),
eth1_chain: self.eth1_chain, eth1_chain: self.eth1_chain,
execution_layer: self.execution_layer,
genesis_validators_root: canonical_head.beacon_state.genesis_validators_root(), genesis_validators_root: canonical_head.beacon_state.genesis_validators_root(),
canonical_head: TimeoutRwLock::new(canonical_head.clone()), canonical_head: TimeoutRwLock::new(canonical_head.clone()),
genesis_block_root, genesis_block_root,

View File

@@ -18,6 +18,6 @@ serde_json = "1.0.58"
serde = { version = "1.0.116", features = ["derive"] } serde = { version = "1.0.116", features = ["derive"] }
eth1 = { path = "../eth1" } eth1 = { path = "../eth1" }
warp = { git = "https://github.com/paulhauner/warp ", branch = "cors-wildcard" } warp = { git = "https://github.com/paulhauner/warp ", branch = "cors-wildcard" }
warp_utils = { path = "../../common/warp_utils" }
environment = { path = "../../lighthouse/environment" } environment = { path = "../../lighthouse/environment" }
bytes = "1.1.0" bytes = "1.1.0"
task_executor = { path = "../../common/task_executor" }

View File

@@ -1,7 +1,10 @@
use engine_api::{http::HttpJsonRpc, Error as ApiError, *}; use engine_api::{Error as ApiError, *};
use engines::{Engine, EngineError, Engines}; use engines::{Engine, EngineError, Engines};
use sensitive_url::SensitiveUrl; use sensitive_url::SensitiveUrl;
use slog::Logger; use slog::Logger;
use task_executor::TaskExecutor;
pub use engine_api::http::HttpJsonRpc;
mod engine_api; mod engine_api;
mod engines; mod engines;
@@ -19,12 +22,18 @@ impl From<ApiError> for Error {
} }
} }
pub struct ExecutionLayer<T> { pub struct ExecutionLayer {
engines: Engines<T>, engines: Engines<HttpJsonRpc>,
/// Allows callers to execute async tasks in a non-async environment, if they desire.
pub executor: TaskExecutor,
} }
impl ExecutionLayer<HttpJsonRpc> { impl ExecutionLayer {
pub fn from_urls(urls: Vec<SensitiveUrl>, log: Logger) -> Result<Self, Error> { pub fn from_urls(
urls: Vec<SensitiveUrl>,
executor: TaskExecutor,
log: Logger,
) -> Result<Self, Error> {
let engines = urls let engines = urls
.into_iter() .into_iter()
.map(|url| { .map(|url| {
@@ -36,11 +45,12 @@ impl ExecutionLayer<HttpJsonRpc> {
Ok(Self { Ok(Self {
engines: Engines { engines, log }, engines: Engines { engines, log },
executor,
}) })
} }
} }
impl<T: EngineApi> ExecutionLayer<T> { impl ExecutionLayer {
pub async fn prepare_payload( pub async fn prepare_payload(
&self, &self,
parent_hash: Hash256, parent_hash: Hash256,

View File

@@ -95,7 +95,6 @@ pub struct Context<T> {
pub struct Config { pub struct Config {
pub listen_addr: Ipv4Addr, pub listen_addr: Ipv4Addr,
pub listen_port: u16, pub listen_port: u16,
pub allow_origin: Option<String>,
} }
impl Default for Config { impl Default for Config {
@@ -103,7 +102,6 @@ impl Default for Config {
Self { Self {
listen_addr: Ipv4Addr::new(127, 0, 0, 1), listen_addr: Ipv4Addr::new(127, 0, 0, 1),
listen_port: 0, listen_port: 0,
allow_origin: None,
} }
} }
} }
@@ -130,19 +128,6 @@ pub fn serve<T: EthSpec>(
let config = &ctx.config; let config = &ctx.config;
let log = ctx.log.clone(); let log = ctx.log.clone();
// Configure CORS.
let cors_builder = {
let builder = warp::cors()
.allow_method("GET")
.allow_headers(vec!["Content-Type"]);
warp_utils::cors::set_builder_origins(
builder,
config.allow_origin.as_deref(),
(config.listen_addr, config.listen_port),
)?
};
let inner_ctx = ctx.clone(); let inner_ctx = ctx.clone();
let routes = warp::post() let routes = warp::post()
.and(warp::path("echo")) .and(warp::path("echo"))
@@ -155,8 +140,7 @@ pub fn serve<T: EthSpec>(
) )
}) })
// Add a `Server` header. // Add a `Server` header.
.map(|reply| warp::reply::with_header(reply, "Server", "lighthouse-mock-execution-client")) .map(|reply| warp::reply::with_header(reply, "Server", "lighthouse-mock-execution-client"));
.with(cors_builder.build());
let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown(
SocketAddrV4::new(config.listen_addr, config.listen_port), SocketAddrV4::new(config.listen_addr, config.listen_port),