mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-19 22:08:30 +00:00
Restore Log on Error & Spawn Blocking in Streamer (#5585)
* Restore Logging in Error Cases * Use Spawn Blocking for Loading Blocks in Streamer * Merge remote-tracking branch 'upstream/unstable' into request_logging_spawn_blocking * Address Sean's Comments * save a clone
This commit is contained in:
@@ -1,10 +1,9 @@
|
||||
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1};
|
||||
use slog::{crit, debug, Logger};
|
||||
use slog::{crit, debug, error, Logger};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use store::{DatabaseBlock, ExecutionPayloadDeneb};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::{
|
||||
mpsc::{self, UnboundedSender},
|
||||
RwLock,
|
||||
@@ -395,18 +394,18 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||
pub fn new(
|
||||
beacon_chain: &Arc<BeaconChain<T>>,
|
||||
check_caches: CheckCaches,
|
||||
) -> Result<Self, BeaconChainError> {
|
||||
) -> Result<Arc<Self>, BeaconChainError> {
|
||||
let execution_layer = beacon_chain
|
||||
.execution_layer
|
||||
.as_ref()
|
||||
.ok_or(BeaconChainError::ExecutionLayerMissing)?
|
||||
.clone();
|
||||
|
||||
Ok(Self {
|
||||
Ok(Arc::new(Self {
|
||||
execution_layer,
|
||||
check_caches,
|
||||
beacon_chain: beacon_chain.clone(),
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
|
||||
@@ -425,30 +424,44 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn load_payloads(&self, block_roots: Vec<Hash256>) -> Vec<(Hash256, LoadResult<T::EthSpec>)> {
|
||||
let mut db_blocks = Vec::new();
|
||||
|
||||
for root in block_roots {
|
||||
if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) {
|
||||
db_blocks.push((root, Ok(Some(cached_block))));
|
||||
continue;
|
||||
}
|
||||
|
||||
match self.beacon_chain.store.try_get_full_block(&root) {
|
||||
Err(e) => db_blocks.push((root, Err(e.into()))),
|
||||
Ok(opt_block) => db_blocks.push((
|
||||
root,
|
||||
Ok(opt_block.map(|db_block| match db_block {
|
||||
DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)),
|
||||
DatabaseBlock::Blinded(block) => {
|
||||
LoadedBeaconBlock::Blinded(Box::new(block))
|
||||
async fn load_payloads(
|
||||
self: &Arc<Self>,
|
||||
block_roots: Vec<Hash256>,
|
||||
) -> Result<Vec<(Hash256, LoadResult<T::EthSpec>)>, BeaconChainError> {
|
||||
let streamer = self.clone();
|
||||
// Loading from the DB is slow -> spawn a blocking task
|
||||
self.beacon_chain
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let mut db_blocks = Vec::new();
|
||||
for root in block_roots {
|
||||
if let Some(cached_block) =
|
||||
streamer.check_caches(root).map(LoadedBeaconBlock::Full)
|
||||
{
|
||||
db_blocks.push((root, Ok(Some(cached_block))));
|
||||
continue;
|
||||
}
|
||||
})),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
db_blocks
|
||||
match streamer.beacon_chain.store.try_get_full_block(&root) {
|
||||
Err(e) => db_blocks.push((root, Err(e.into()))),
|
||||
Ok(opt_block) => db_blocks.push((
|
||||
root,
|
||||
Ok(opt_block.map(|db_block| match db_block {
|
||||
DatabaseBlock::Full(block) => {
|
||||
LoadedBeaconBlock::Full(Arc::new(block))
|
||||
}
|
||||
DatabaseBlock::Blinded(block) => {
|
||||
LoadedBeaconBlock::Blinded(Box::new(block))
|
||||
}
|
||||
})),
|
||||
)),
|
||||
}
|
||||
}
|
||||
db_blocks
|
||||
},
|
||||
"load_beacon_blocks",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Pre-process the loaded blocks into execution engine requests.
|
||||
@@ -549,7 +562,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||
|
||||
// used when the execution engine doesn't support the payload bodies methods
|
||||
async fn stream_blocks_fallback(
|
||||
&self,
|
||||
self: Arc<Self>,
|
||||
block_roots: Vec<Hash256>,
|
||||
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
|
||||
) {
|
||||
@@ -575,7 +588,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||
}
|
||||
|
||||
async fn stream_blocks(
|
||||
&self,
|
||||
self: Arc<Self>,
|
||||
block_roots: Vec<Hash256>,
|
||||
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
|
||||
) {
|
||||
@@ -584,7 +597,17 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||
let mut n_sent = 0usize;
|
||||
let mut engine_requests = 0usize;
|
||||
|
||||
let payloads = self.load_payloads(block_roots);
|
||||
let payloads = match self.load_payloads(block_roots).await {
|
||||
Ok(payloads) => payloads,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.beacon_chain.log,
|
||||
"BeaconBlockStreamer: Failed to load payloads";
|
||||
"error" => ?e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let requests = self.get_requests(payloads).await;
|
||||
|
||||
for (root, request) in requests {
|
||||
@@ -624,7 +647,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||
}
|
||||
|
||||
pub async fn stream(
|
||||
self,
|
||||
self: Arc<Self>,
|
||||
block_roots: Vec<Hash256>,
|
||||
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
|
||||
) {
|
||||
@@ -650,9 +673,8 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||
}
|
||||
|
||||
pub fn launch_stream(
|
||||
self,
|
||||
self: Arc<Self>,
|
||||
block_roots: Vec<Hash256>,
|
||||
executor: &TaskExecutor,
|
||||
) -> impl Stream<Item = (Hash256, Arc<BlockResult<T::EthSpec>>)> {
|
||||
let (block_tx, block_rx) = mpsc::unbounded_channel();
|
||||
debug!(
|
||||
@@ -660,6 +682,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
|
||||
"Launching a BeaconBlockStreamer";
|
||||
"blocks" => block_roots.len(),
|
||||
);
|
||||
let executor = self.beacon_chain.task_executor.clone();
|
||||
executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender");
|
||||
UnboundedReceiverStream::new(block_rx)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user