Use spawn_async in ByRoot handling workers (#5557)

* Use spawn_async in ByRoot handling workers

* box large variants
This commit is contained in:
Lion - dapplion
2024-04-13 04:30:04 +09:00
committed by GitHub
parent 116a55e8a5
commit b6a1c863a2
3 changed files with 184 additions and 239 deletions

View File

@@ -571,7 +571,7 @@ pub enum BlockingOrAsync {
/// queuing specifics.
pub enum Work<E: EthSpec> {
GossipAttestation {
attestation: GossipAttestationPackage<E>,
attestation: Box<GossipAttestationPackage<E>>,
process_individual: Box<dyn FnOnce(GossipAttestationPackage<E>) + Send + Sync>,
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
},
@@ -583,7 +583,7 @@ pub enum Work<E: EthSpec> {
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
},
GossipAggregate {
aggregate: GossipAggregatePackage<E>,
aggregate: Box<GossipAggregatePackage<E>>,
process_individual: Box<dyn FnOnce(GossipAggregatePackage<E>) + Send + Sync>,
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
},
@@ -624,8 +624,8 @@ pub enum Work<E: EthSpec> {
ChainSegment(AsyncFn),
ChainSegmentBackfill(AsyncFn),
Status(BlockingFn),
BlocksByRangeRequest(BlockingFnWithManualSendOnIdle),
BlocksByRootsRequest(BlockingFnWithManualSendOnIdle),
BlocksByRangeRequest(AsyncFn),
BlocksByRootsRequest(AsyncFn),
BlobsByRangeRequest(BlockingFn),
BlobsByRootsRequest(BlockingFn),
GossipBlsToExecutionChange(BlockingFn),
@@ -1015,7 +1015,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual: _,
process_batch,
} => {
aggregates.push(aggregate);
aggregates.push(*aggregate);
if process_batch_opt.is_none() {
process_batch_opt = Some(process_batch);
}
@@ -1075,7 +1075,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual: _,
process_batch,
} => {
attestations.push(attestation);
attestations.push(*attestation);
if process_batch_opt.is_none() {
process_batch_opt = Some(process_batch);
}
@@ -1445,7 +1445,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual,
process_batch: _,
} => task_spawner.spawn_blocking(move || {
process_individual(attestation);
process_individual(*attestation);
}),
Work::GossipAttestationBatch {
attestations,
@@ -1458,7 +1458,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual,
process_batch: _,
} => task_spawner.spawn_blocking(move || {
process_individual(aggregate);
process_individual(*aggregate);
}),
Work::GossipAggregateBatch {
aggregates,
@@ -1493,7 +1493,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_blocking(process_fn)
}
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_blocking_with_manual_send_idle(work)
task_spawner.spawn_async(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
@@ -1555,23 +1555,6 @@ impl TaskSpawner {
WORKER_TASK_NAME,
)
}
/// Spawn a blocking task, passing the `SendOnDrop` into the task.
///
/// ## Notes
///
/// Users must ensure the `SendOnDrop` is dropped at the appropriate time!
pub fn spawn_blocking_with_manual_send_idle<F>(self, task: F)
where
F: FnOnce(SendOnDrop) + Send + 'static,
{
self.executor.spawn_blocking(
|| {
task(self.send_idle_on_drop);
},
WORKER_TASK_NAME,
)
}
}
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged on

View File

@@ -102,14 +102,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.try_send(BeaconWorkEvent {
drop_during_sync: true,
work: Work::GossipAttestation {
attestation: GossipAttestationPackage {
attestation: Box::new(GossipAttestationPackage {
message_id,
peer_id,
attestation: Box::new(attestation),
subnet_id,
should_import,
seen_timestamp,
},
}),
process_individual: Box::new(process_individual),
process_batch: Box::new(process_batch),
},
@@ -148,13 +148,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.try_send(BeaconWorkEvent {
drop_during_sync: true,
work: Work::GossipAggregate {
aggregate: GossipAggregatePackage {
aggregate: Box::new(GossipAggregatePackage {
message_id,
peer_id,
aggregate: Box::new(aggregate),
beacon_block_root,
seen_timestamp,
},
}),
process_individual: Box::new(process_individual),
process_batch: Box::new(process_batch),
},
@@ -508,20 +508,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request: BlocksByRangeRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move |send_idle_on_drop| {
let process_fn = async move {
let executor = processor.executor.clone();
processor.handle_blocks_by_range_request(
executor,
send_idle_on_drop,
peer_id,
request_id,
request,
)
processor
.handle_blocks_by_range_request(executor, peer_id, request_id, request)
.await;
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::BlocksByRangeRequest(Box::new(process_fn)),
work: Work::BlocksByRangeRequest(Box::pin(process_fn)),
})
}
@@ -533,20 +529,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request: BlocksByRootRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move |send_idle_on_drop| {
let process_fn = async move {
let executor = processor.executor.clone();
processor.handle_blocks_by_root_request(
executor,
send_idle_on_drop,
peer_id,
request_id,
request,
)
processor
.handle_blocks_by_root_request(executor, peer_id, request_id, request)
.await;
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::BlocksByRootsRequest(Box::new(process_fn)),
work: Work::BlocksByRootsRequest(Box::pin(process_fn)),
})
}

View File

@@ -3,7 +3,6 @@ use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped};
use beacon_processor::SendOnDrop;
use itertools::process_results;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
use lighthouse_network::rpc::*;
@@ -128,10 +127,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `BlocksByRoot` request from the peer.
pub fn handle_blocks_by_root_request(
pub async fn handle_blocks_by_root_request(
self: Arc<Self>,
executor: TaskExecutor,
send_on_drop: SendOnDrop,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlocksByRootRequest,
@@ -145,72 +143,61 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
};
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut send_block_count = 0;
let mut send_response = true;
while let Some((root, result)) = block_stream.next().await {
match result.as_ref() {
Ok(Some(block)) => {
self.send_response(
peer_id,
Response::BlocksByRoot(Some(block.clone())),
request_id,
);
send_block_count += 1;
}
Ok(None) => {
debug!(
self.log,
"Peer requested unknown block";
"peer" => %peer_id,
"request_root" => ?root
);
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
self.log,
"Failed to fetch execution payload for blocks by root request";
"block_root" => ?root,
"reason" => "execution layer not synced",
);
// send the stream terminator
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Execution layer not synced".into(),
request_id,
);
send_response = false;
break;
}
Err(e) => {
debug!(
self.log,
"Error fetching block for peer";
"peer" => %peer_id,
"request_root" => ?root,
"error" => ?e,
);
}
}
let mut send_block_count = 0;
while let Some((root, result)) = block_stream.next().await {
match result.as_ref() {
Ok(Some(block)) => {
self.send_response(
peer_id,
Response::BlocksByRoot(Some(block.clone())),
request_id,
);
send_block_count += 1;
}
debug!(
self.log,
"Received BlocksByRoot Request";
"peer" => %peer_id,
"requested" => requested_blocks,
"returned" => %send_block_count
);
Ok(None) => {
debug!(
self.log,
"Peer requested unknown block";
"peer" => %peer_id,
"request_root" => ?root
);
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
self.log,
"Failed to fetch execution payload for blocks by root request";
"block_root" => ?root,
"reason" => "execution layer not synced",
);
// send the stream terminator
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Execution layer not synced".into(),
request_id,
);
}
Err(e) => {
debug!(
self.log,
"Error fetching block for peer";
"peer" => %peer_id,
"request_root" => ?root,
"error" => ?e,
);
}
}
}
debug!(
self.log,
"Received BlocksByRoot Request";
"peer" => %peer_id,
"requested" => requested_blocks,
"returned" => %send_block_count
);
// send stream termination
if send_response {
self.send_response(peer_id, Response::BlocksByRoot(None), request_id);
}
drop(send_on_drop);
},
"load_blocks_by_root_blocks",
)
// send stream termination
self.send_response(peer_id, Response::BlocksByRoot(None), request_id);
}
/// Handle a `BlobsByRoot` request from the peer.
@@ -386,10 +373,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `BlocksByRange` request from the peer.
pub fn handle_blocks_by_range_request(
pub async fn handle_blocks_by_range_request(
self: Arc<Self>,
executor: TaskExecutor,
send_on_drop: SendOnDrop,
peer_id: PeerId,
request_id: PeerRequestId,
req: BlocksByRangeRequest,
@@ -499,137 +485,121 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};
// Fetching blocks is async because it may have to hit the execution layer for payloads.
executor.spawn(
async move {
let mut blocks_sent = 0;
let mut send_response = true;
let mut blocks_sent = 0;
while let Some((root, result)) = block_stream.next().await {
match result.as_ref() {
Ok(Some(block)) => {
// Due to skip slots, blocks could be out of the range, we ensure they
// are in the range before sending
if block.slot() >= *req.start_slot()
&& block.slot() < req.start_slot() + req.count()
{
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlocksByRange(Some(block.clone())),
id: request_id,
});
}
}
Ok(None) => {
error!(
self.log,
"Block in the chain is not in the store";
"request" => ?req,
"peer" => %peer_id,
"request_root" => ?root
);
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database inconsistency".into(),
request_id,
);
send_response = false;
break;
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
self.log,
"Failed to fetch execution payload for blocks by range request";
"block_root" => ?root,
"reason" => "execution layer not synced",
);
// send the stream terminator
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Execution layer not synced".into(),
request_id,
);
send_response = false;
break;
}
Err(e) => {
if matches!(
e,
BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, ref boxed_error)
if matches!(**boxed_error, execution_layer::Error::EngineError(_))
) {
warn!(
self.log,
"Error rebuilding payload for peer";
"info" => "this may occur occasionally when the EE is busy",
"block_root" => ?root,
"error" => ?e,
);
} else {
error!(
self.log,
"Error fetching block for peer";
"block_root" => ?root,
"error" => ?e
);
}
// send the stream terminator
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Failed fetching blocks".into(),
request_id,
);
send_response = false;
break;
}
while let Some((root, result)) = block_stream.next().await {
match result.as_ref() {
Ok(Some(block)) => {
// Due to skip slots, blocks could be out of the range, we ensure they
// are in the range before sending
if block.slot() >= *req.start_slot()
&& block.slot() < req.start_slot() + req.count()
{
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlocksByRange(Some(block.clone())),
id: request_id,
});
}
}
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
if blocks_sent < (*req.count() as usize) {
debug!(
Ok(None) => {
error!(
self.log,
"BlocksByRange outgoing response processed";
"Block in the chain is not in the store";
"request" => ?req,
"peer" => %peer_id,
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot(),
"current_slot" => current_slot,
"requested" => req.count(),
"returned" => blocks_sent
"request_root" => ?root
);
} else {
debug!(
self.log,
"BlocksByRange outgoing response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot(),
"current_slot" => current_slot,
"requested" => req.count(),
"returned" => blocks_sent
);
}
if send_response {
// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse {
return self.send_error_response(
peer_id,
response: Response::BlocksByRange(None),
id: request_id,
});
RPCResponseErrorCode::ServerError,
"Database inconsistency".into(),
request_id,
);
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
self.log,
"Failed to fetch execution payload for blocks by range request";
"block_root" => ?root,
"reason" => "execution layer not synced",
);
// send the stream terminator
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Execution layer not synced".into(),
request_id,
);
}
Err(e) => {
if matches!(
e,
BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, ref boxed_error)
if matches!(**boxed_error, execution_layer::Error::EngineError(_))
) {
warn!(
self.log,
"Error rebuilding payload for peer";
"info" => "this may occur occasionally when the EE is busy",
"block_root" => ?root,
"error" => ?e,
);
} else {
error!(
self.log,
"Error fetching block for peer";
"block_root" => ?root,
"error" => ?e
);
}
drop(send_on_drop);
},
"load_blocks_by_range_blocks",
);
// send the stream terminator
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Failed fetching blocks".into(),
request_id,
);
}
}
}
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
if blocks_sent < (*req.count() as usize) {
debug!(
self.log,
"BlocksByRange outgoing response processed";
"peer" => %peer_id,
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot(),
"current_slot" => current_slot,
"requested" => req.count(),
"returned" => blocks_sent
);
} else {
debug!(
self.log,
"BlocksByRange outgoing response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot(),
"current_slot" => current_slot,
"requested" => req.count(),
"returned" => blocks_sent
);
}
// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlocksByRange(None),
id: request_id,
});
}
/// Handle a `BlobsByRange` request from the peer.