Merge Engines and Engine struct in one in the execution_layer crate (#3284)

## Issue Addressed

Part of https://github.com/sigp/lighthouse/issues/3118, continuation of https://github.com/sigp/lighthouse/pull/3257 and https://github.com/sigp/lighthouse/pull/3283

## Proposed Changes
- Merge the [`Engines`](9c429d0764/beacon_node/execution_layer/src/engines.rs (L161-L165)) struct and [`Engine` ](9c429d0764/beacon_node/execution_layer/src/engines.rs (L62-L67))
- Remove unnecessary generics 

## Additional Info
There is more cleanup to do that will come in subsequent PRs
This commit is contained in:
Divma
2022-07-11 01:44:41 +00:00
parent 5dbfb37d74
commit 6d42a09ff8
4 changed files with 52 additions and 77 deletions

View File

@@ -1,7 +1,7 @@
//! Provides generic behaviour for multiple execution engines, specifically fallback behaviour.
use crate::engine_api::{
EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId,
Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId,
};
use crate::HttpJsonRpc;
use lru::LruCache;
@@ -55,20 +55,32 @@ struct PayloadIdCacheKey {
pub suggested_fee_recipient: Address,
}
/// An execution engine.
pub struct Engine<T> {
pub api: HttpJsonRpc<T>,
payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>,
state: RwLock<EngineState>,
#[derive(Debug)]
pub enum EngineError {
Offline,
Api { error: EngineApiError },
BuilderApi { error: EngineApiError },
Auth,
}
impl<T> Engine<T> {
/// An execution engine.
pub struct Engine {
pub api: HttpJsonRpc,
payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>,
state: RwLock<EngineState>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub log: Logger,
}
impl Engine {
/// Creates a new, offline engine.
pub fn new(api: HttpJsonRpc<T>) -> Self {
pub fn new(api: HttpJsonRpc, log: &Logger) -> Self {
Self {
api,
payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)),
state: RwLock::new(EngineState::Offline),
latest_forkchoice_state: Default::default(),
log: log.clone(),
}
}
@@ -90,9 +102,7 @@ impl<T> Engine<T> {
})
.cloned()
}
}
impl Engine<EngineApi> {
pub async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
@@ -120,26 +130,7 @@ impl Engine<EngineApi> {
Ok(response)
}
}
// This structure used to hold multiple execution engines managed in a fallback manner. This
// functionality has been removed following https://github.com/sigp/lighthouse/issues/3118 and this
// struct will likely be removed in the future.
pub struct Engines {
pub engine: Engine<EngineApi>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub log: Logger,
}
#[derive(Debug)]
pub enum EngineError {
Offline,
Api { error: EngineApiError },
BuilderApi { error: EngineApiError },
Auth,
}
impl Engines {
async fn get_latest_forkchoice_state(&self) -> Option<ForkChoiceState> {
*self.latest_forkchoice_state.read().await
}
@@ -169,12 +160,7 @@ impl Engines {
// For simplicity, payload attributes are never included in this call. It may be
// reasonable to include them in the future.
if let Err(e) = self
.engine
.api
.forkchoice_updated_v1(forkchoice_state, None)
.await
{
if let Err(e) = self.api.forkchoice_updated_v1(forkchoice_state, None).await {
debug!(
self.log,
"Failed to issue latest head to engine";
@@ -191,14 +177,14 @@ impl Engines {
/// Returns `true` if the engine has a "synced" status.
pub async fn is_synced(&self) -> bool {
*self.engine.state.read().await == EngineState::Synced
*self.state.read().await == EngineState::Synced
}
/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
/// might be used to recover the node if offline.
pub async fn upcheck_not_synced(&self, logging: Logging) {
let mut state_lock = self.engine.state.write().await;
let mut state_lock = self.state.write().await;
if *state_lock != EngineState::Synced {
match self.engine.api.upcheck().await {
match self.api.upcheck().await {
Ok(()) => {
if logging.is_enabled() {
info!(
@@ -261,7 +247,7 @@ impl Engines {
/// upcheck it and then run the function again.
pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
F: Fn(&'a Engine) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.first_success_without_retry(func).await {
@@ -282,18 +268,18 @@ impl Engines {
func: F,
) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G,
F: Fn(&'a Engine) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let (engine_synced, engine_auth_failed) = {
let state = self.engine.state.read().await;
let state = self.state.read().await;
(
*state == EngineState::Synced,
*state == EngineState::AuthFailed,
)
};
if engine_synced {
match func(&self.engine).await {
match func(self).await {
Ok(result) => Ok(result),
Err(error) => {
debug!(
@@ -301,7 +287,7 @@ impl Engines {
"Execution engine call failed";
"error" => ?error,
);
*self.engine.state.write().await = EngineState::Offline;
*self.state.write().await = EngineState::Offline;
Err(EngineError::Api { error })
}
}
@@ -318,7 +304,7 @@ impl Engines {
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
F: Fn(&'a Engine) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.broadcast_without_retry(func).await {
@@ -333,14 +319,14 @@ impl Engines {
/// Runs `func` on the node if it's last state is not offline.
pub async fn broadcast_without_retry<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G,
F: Fn(&'a Engine) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
if *self.engine.state.read().await == EngineState::Offline {
if *self.state.read().await == EngineState::Offline {
Err(EngineError::Offline)
} else {
match func(&self.engine).await {
match func(self).await {
Ok(res) => Ok(res),
Err(error) => {
debug!(
@@ -348,7 +334,7 @@ impl Engines {
"Execution engine call failed";
"error" => ?error,
);
*self.engine.state.write().await = EngineState::Offline;
*self.state.write().await = EngineState::Offline;
Err(EngineError::Api { error })
}
}