mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-02 20:34:27 +00:00
Use scoped rayon pool for backfill chain segment processing (#7924)
Part of #7866 - Continuation of #7921 In the above PR, we enabled rayon for batch KZG verification in chain segment processing. However, using the global rayon thread pool for backfill is likely to create resource contention with higher-priority beacon processor work. This PR introduces a dedicated low-priority rayon thread pool `LOW_PRIORITY_RAYON_POOL` and uses it for processing backfill chain segments. This prevents backfill KZG verification from using the global rayon thread pool and competing with high-priority beacon processor tasks for CPU resources. However, this PR by itself doesn't prevent CPU oversubscription because other tasks could still fill up the global rayon thread pool, and having an extra thread pool could make things worse. To address this we need the beacon processor to coordinate total CPU allocation across all tasks, which is covered in: - #7789 Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com> Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com> Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -980,6 +980,7 @@ dependencies = [
|
|||||||
"metrics",
|
"metrics",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
"parking_lot 0.12.3",
|
"parking_lot 0.12.3",
|
||||||
|
"rayon",
|
||||||
"serde",
|
"serde",
|
||||||
"slot_clock",
|
"slot_clock",
|
||||||
"strum",
|
"strum",
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ logging = { workspace = true }
|
|||||||
metrics = { workspace = true }
|
metrics = { workspace = true }
|
||||||
num_cpus = { workspace = true }
|
num_cpus = { workspace = true }
|
||||||
parking_lot = { workspace = true }
|
parking_lot = { workspace = true }
|
||||||
|
rayon = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
slot_clock = { workspace = true }
|
slot_clock = { workspace = true }
|
||||||
strum = { workspace = true }
|
strum = { workspace = true }
|
||||||
|
|||||||
@@ -38,6 +38,7 @@
|
|||||||
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
|
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
|
||||||
//! task.
|
//! task.
|
||||||
|
|
||||||
|
use crate::rayon_manager::RayonManager;
|
||||||
use crate::work_reprocessing_queue::{
|
use crate::work_reprocessing_queue::{
|
||||||
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
|
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
|
||||||
};
|
};
|
||||||
@@ -47,6 +48,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
|
|||||||
use logging::TimeLatch;
|
use logging::TimeLatch;
|
||||||
use logging::crit;
|
use logging::crit;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
use rayon::ThreadPool;
|
||||||
pub use scheduler::work_reprocessing_queue;
|
pub use scheduler::work_reprocessing_queue;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
@@ -74,6 +76,7 @@ use work_reprocessing_queue::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
mod metrics;
|
mod metrics;
|
||||||
|
pub mod rayon_manager;
|
||||||
pub mod scheduler;
|
pub mod scheduler;
|
||||||
|
|
||||||
/// The maximum size of the channel for work events to the `BeaconProcessor`.
|
/// The maximum size of the channel for work events to the `BeaconProcessor`.
|
||||||
@@ -603,7 +606,7 @@ pub enum Work<E: EthSpec> {
|
|||||||
process_fn: BlockingFn,
|
process_fn: BlockingFn,
|
||||||
},
|
},
|
||||||
ChainSegment(AsyncFn),
|
ChainSegment(AsyncFn),
|
||||||
ChainSegmentBackfill(AsyncFn),
|
ChainSegmentBackfill(BlockingFn),
|
||||||
Status(BlockingFn),
|
Status(BlockingFn),
|
||||||
BlocksByRangeRequest(AsyncFn),
|
BlocksByRangeRequest(AsyncFn),
|
||||||
BlocksByRootsRequest(AsyncFn),
|
BlocksByRootsRequest(AsyncFn),
|
||||||
@@ -807,6 +810,7 @@ pub struct BeaconProcessor<E: EthSpec> {
|
|||||||
pub network_globals: Arc<NetworkGlobals<E>>,
|
pub network_globals: Arc<NetworkGlobals<E>>,
|
||||||
pub executor: TaskExecutor,
|
pub executor: TaskExecutor,
|
||||||
pub current_workers: usize,
|
pub current_workers: usize,
|
||||||
|
pub rayon_manager: RayonManager,
|
||||||
pub config: BeaconProcessorConfig,
|
pub config: BeaconProcessorConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1603,7 +1607,17 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
|
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
|
||||||
task_spawner.spawn_async(work)
|
task_spawner.spawn_async(work)
|
||||||
}
|
}
|
||||||
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
|
Work::ChainSegmentBackfill(process_fn) => {
|
||||||
|
if self.config.enable_backfill_rate_limiting {
|
||||||
|
task_spawner.spawn_blocking_with_rayon(
|
||||||
|
self.rayon_manager.low_priority_threadpool.clone(),
|
||||||
|
process_fn,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// use the global rayon thread pool if backfill rate limiting is disabled.
|
||||||
|
task_spawner.spawn_blocking(process_fn)
|
||||||
|
}
|
||||||
|
}
|
||||||
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
|
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
|
||||||
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
|
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
|
||||||
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
|
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
|
||||||
@@ -1665,6 +1679,22 @@ impl TaskSpawner {
|
|||||||
WORKER_TASK_NAME,
|
WORKER_TASK_NAME,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
|
||||||
|
fn spawn_blocking_with_rayon<F>(self, thread_pool: Arc<ThreadPool>, task: F)
|
||||||
|
where
|
||||||
|
F: FnOnce() + Send + 'static,
|
||||||
|
{
|
||||||
|
self.executor.spawn_blocking(
|
||||||
|
move || {
|
||||||
|
thread_pool.install(|| {
|
||||||
|
task();
|
||||||
|
});
|
||||||
|
drop(self.send_idle_on_drop)
|
||||||
|
},
|
||||||
|
WORKER_TASK_NAME,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged
|
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged
|
||||||
|
|||||||
27
beacon_node/beacon_processor/src/rayon_manager.rs
Normal file
27
beacon_node/beacon_processor/src/rayon_manager.rs
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
const DEFAULT_LOW_PRIORITY_DIVISOR: usize = 4;
|
||||||
|
const MINIMUM_LOW_PRIORITY_THREAD_COUNT: usize = 1;
|
||||||
|
|
||||||
|
pub struct RayonManager {
|
||||||
|
/// Smaller rayon thread pool for lower-priority, compute-intensive tasks.
|
||||||
|
/// By default ~25% of CPUs or a minimum of 1 thread.
|
||||||
|
pub low_priority_threadpool: Arc<ThreadPool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RayonManager {
|
||||||
|
fn default() -> Self {
|
||||||
|
let low_prio_threads =
|
||||||
|
(num_cpus::get() / DEFAULT_LOW_PRIORITY_DIVISOR).max(MINIMUM_LOW_PRIORITY_THREAD_COUNT);
|
||||||
|
let low_priority_threadpool = Arc::new(
|
||||||
|
ThreadPoolBuilder::new()
|
||||||
|
.num_threads(low_prio_threads)
|
||||||
|
.build()
|
||||||
|
.expect("failed to build low-priority rayon pool"),
|
||||||
|
);
|
||||||
|
Self {
|
||||||
|
low_priority_threadpool,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -173,7 +173,7 @@ pub struct IgnoredRpcBlock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// A backfill batch work that has been queued for processing later.
|
/// A backfill batch work that has been queued for processing later.
|
||||||
pub struct QueuedBackfillBatch(pub AsyncFn);
|
pub struct QueuedBackfillBatch(pub BlockingFn);
|
||||||
|
|
||||||
pub struct QueuedColumnReconstruction {
|
pub struct QueuedColumnReconstruction {
|
||||||
pub block_root: Hash256,
|
pub block_root: Hash256,
|
||||||
@@ -1084,7 +1084,7 @@ mod tests {
|
|||||||
// Now queue a backfill sync batch.
|
// Now queue a backfill sync batch.
|
||||||
work_reprocessing_tx
|
work_reprocessing_tx
|
||||||
.try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch(
|
.try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch(
|
||||||
Box::pin(async {}),
|
Box::new(|| {}),
|
||||||
)))
|
)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ use beacon_chain::{
|
|||||||
store::{HotColdDB, ItemStore, StoreConfig},
|
store::{HotColdDB, ItemStore, StoreConfig},
|
||||||
};
|
};
|
||||||
use beacon_chain::{Kzg, LightClientProducerEvent};
|
use beacon_chain::{Kzg, LightClientProducerEvent};
|
||||||
|
use beacon_processor::rayon_manager::RayonManager;
|
||||||
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
|
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
|
||||||
use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths};
|
use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths};
|
||||||
use environment::RuntimeContext;
|
use environment::RuntimeContext;
|
||||||
@@ -680,6 +681,7 @@ where
|
|||||||
executor: beacon_processor_context.executor.clone(),
|
executor: beacon_processor_context.executor.clone(),
|
||||||
current_workers: 0,
|
current_workers: 0,
|
||||||
config: beacon_processor_config,
|
config: beacon_processor_config,
|
||||||
|
rayon_manager: RayonManager::default(),
|
||||||
}
|
}
|
||||||
.spawn_manager(
|
.spawn_manager(
|
||||||
beacon_processor_channels.beacon_processor_rx,
|
beacon_processor_channels.beacon_processor_rx,
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use beacon_chain::{
|
|||||||
};
|
};
|
||||||
use beacon_processor::{
|
use beacon_processor::{
|
||||||
BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths,
|
BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths,
|
||||||
|
rayon_manager::RayonManager,
|
||||||
};
|
};
|
||||||
use directory::DEFAULT_ROOT_DIR;
|
use directory::DEFAULT_ROOT_DIR;
|
||||||
use eth2::{BeaconNodeHttpClient, Timeouts};
|
use eth2::{BeaconNodeHttpClient, Timeouts};
|
||||||
@@ -247,6 +248,7 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
|
|||||||
executor: test_runtime.task_executor.clone(),
|
executor: test_runtime.task_executor.clone(),
|
||||||
current_workers: 0,
|
current_workers: 0,
|
||||||
config: beacon_processor_config,
|
config: beacon_processor_config,
|
||||||
|
rayon_manager: RayonManager::default(),
|
||||||
}
|
}
|
||||||
.spawn_manager(
|
.spawn_manager(
|
||||||
beacon_processor_rx,
|
beacon_processor_rx,
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block";
|
|||||||
pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs";
|
pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs";
|
||||||
pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns";
|
pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns";
|
||||||
pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
|
pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
|
||||||
|
pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill";
|
||||||
|
|
||||||
/// Fork choice root spans
|
/// Fork choice root spans
|
||||||
pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot";
|
pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot";
|
||||||
@@ -61,6 +62,7 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[
|
|||||||
SPAN_PROCESS_RPC_BLOBS,
|
SPAN_PROCESS_RPC_BLOBS,
|
||||||
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
|
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
|
||||||
SPAN_PROCESS_CHAIN_SEGMENT,
|
SPAN_PROCESS_CHAIN_SEGMENT,
|
||||||
|
SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL,
|
||||||
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST,
|
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST,
|
||||||
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST,
|
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST,
|
||||||
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST,
|
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST,
|
||||||
|
|||||||
@@ -6,9 +6,7 @@ use beacon_chain::data_column_verification::{GossipDataColumnError, observe_goss
|
|||||||
use beacon_chain::fetch_blobs::{
|
use beacon_chain::fetch_blobs::{
|
||||||
EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs,
|
EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs,
|
||||||
};
|
};
|
||||||
use beacon_chain::{
|
use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
|
||||||
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
|
|
||||||
};
|
|
||||||
use beacon_processor::{
|
use beacon_processor::{
|
||||||
BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
|
BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
|
||||||
WorkEvent as BeaconWorkEvent,
|
WorkEvent as BeaconWorkEvent,
|
||||||
@@ -500,33 +498,23 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
process_id: ChainSegmentProcessId,
|
process_id: ChainSegmentProcessId,
|
||||||
blocks: Vec<RpcBlock<T::EthSpec>>,
|
blocks: Vec<RpcBlock<T::EthSpec>>,
|
||||||
) -> Result<(), Error<T::EthSpec>> {
|
) -> Result<(), Error<T::EthSpec>> {
|
||||||
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
|
|
||||||
debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process");
|
debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process");
|
||||||
|
|
||||||
let processor = self.clone();
|
let processor = self.clone();
|
||||||
let process_fn = async move {
|
|
||||||
let notify_execution_layer = if processor
|
|
||||||
.network_globals
|
|
||||||
.sync_state
|
|
||||||
.read()
|
|
||||||
.is_syncing_finalized()
|
|
||||||
{
|
|
||||||
NotifyExecutionLayer::No
|
|
||||||
} else {
|
|
||||||
NotifyExecutionLayer::Yes
|
|
||||||
};
|
|
||||||
processor
|
|
||||||
.process_chain_segment(process_id, blocks, notify_execution_layer)
|
|
||||||
.await;
|
|
||||||
};
|
|
||||||
let process_fn = Box::pin(process_fn);
|
|
||||||
|
|
||||||
// Back-sync batches are dispatched with a different `Work` variant so
|
// Back-sync batches are dispatched with a different `Work` variant so
|
||||||
// they can be rate-limited.
|
// they can be rate-limited.
|
||||||
let work = if is_backfill {
|
let work = match process_id {
|
||||||
Work::ChainSegmentBackfill(process_fn)
|
ChainSegmentProcessId::RangeBatchId(_, _) => {
|
||||||
} else {
|
let process_fn = async move {
|
||||||
Work::ChainSegment(process_fn)
|
processor.process_chain_segment(process_id, blocks).await;
|
||||||
|
};
|
||||||
|
Work::ChainSegment(Box::pin(process_fn))
|
||||||
|
}
|
||||||
|
ChainSegmentProcessId::BackSyncBatchId(_) => {
|
||||||
|
let process_fn =
|
||||||
|
move || processor.process_chain_segment_backfill(process_id, blocks);
|
||||||
|
Work::ChainSegmentBackfill(Box::new(process_fn))
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.try_send(BeaconWorkEvent {
|
self.try_send(BeaconWorkEvent {
|
||||||
|
|||||||
@@ -19,9 +19,10 @@ use beacon_processor::{
|
|||||||
use beacon_processor::{Work, WorkEvent};
|
use beacon_processor::{Work, WorkEvent};
|
||||||
use lighthouse_network::PeerAction;
|
use lighthouse_network::PeerAction;
|
||||||
use lighthouse_tracing::{
|
use lighthouse_tracing::{
|
||||||
SPAN_PROCESS_CHAIN_SEGMENT, SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_BLOCK,
|
SPAN_PROCESS_CHAIN_SEGMENT, SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, SPAN_PROCESS_RPC_BLOBS,
|
||||||
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
|
SPAN_PROCESS_RPC_BLOCK, SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
|
||||||
};
|
};
|
||||||
|
use logging::crit;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use store::KzgCommitment;
|
use store::KzgCommitment;
|
||||||
@@ -434,27 +435,42 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
parent = None,
|
parent = None,
|
||||||
level = "debug",
|
level = "debug",
|
||||||
skip_all,
|
skip_all,
|
||||||
fields(sync_type = ?sync_type, downloaded_blocks = downloaded_blocks.len())
|
fields(process_id = ?process_id, downloaded_blocks = downloaded_blocks.len())
|
||||||
)]
|
)]
|
||||||
pub async fn process_chain_segment(
|
pub async fn process_chain_segment(
|
||||||
&self,
|
&self,
|
||||||
sync_type: ChainSegmentProcessId,
|
process_id: ChainSegmentProcessId,
|
||||||
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,
|
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,
|
||||||
notify_execution_layer: NotifyExecutionLayer,
|
|
||||||
) {
|
) {
|
||||||
let result = match sync_type {
|
let ChainSegmentProcessId::RangeBatchId(chain_id, epoch) = process_id else {
|
||||||
// this a request from the range sync
|
// This is a request from range sync, this should _never_ happen
|
||||||
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
|
crit!(
|
||||||
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
|
error = "process_chain_segment called on a variant other than RangeBatchId",
|
||||||
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
|
"Please notify the devs"
|
||||||
let sent_blocks = downloaded_blocks.len();
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
match self
|
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
|
||||||
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
|
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
|
||||||
.await
|
let sent_blocks = downloaded_blocks.len();
|
||||||
{
|
let notify_execution_layer = if self
|
||||||
(imported_blocks, Ok(_)) => {
|
.network_globals
|
||||||
debug!(
|
.sync_state
|
||||||
|
.read()
|
||||||
|
.is_syncing_finalized()
|
||||||
|
{
|
||||||
|
NotifyExecutionLayer::No
|
||||||
|
} else {
|
||||||
|
NotifyExecutionLayer::Yes
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = match self
|
||||||
|
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
(imported_blocks, Ok(_)) => {
|
||||||
|
debug!(
|
||||||
batch_epoch = %epoch,
|
batch_epoch = %epoch,
|
||||||
first_block_slot = start_slot,
|
first_block_slot = start_slot,
|
||||||
chain = chain_id,
|
chain = chain_id,
|
||||||
@@ -462,13 +478,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
processed_blocks = sent_blocks,
|
processed_blocks = sent_blocks,
|
||||||
service= "sync",
|
service= "sync",
|
||||||
"Batch processed");
|
"Batch processed");
|
||||||
BatchProcessResult::Success {
|
BatchProcessResult::Success {
|
||||||
sent_blocks,
|
sent_blocks,
|
||||||
imported_blocks,
|
imported_blocks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(imported_blocks, Err(e)) => {
|
(imported_blocks, Err(e)) => {
|
||||||
debug!(
|
debug!(
|
||||||
batch_epoch = %epoch,
|
batch_epoch = %epoch,
|
||||||
first_block_slot = start_slot,
|
first_block_slot = start_slot,
|
||||||
chain = chain_id,
|
chain = chain_id,
|
||||||
@@ -477,33 +493,61 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
error = %e.message,
|
error = %e.message,
|
||||||
service = "sync",
|
service = "sync",
|
||||||
"Batch processing failed");
|
"Batch processing failed");
|
||||||
match e.peer_action {
|
match e.peer_action {
|
||||||
Some(penalty) => BatchProcessResult::FaultyFailure {
|
Some(penalty) => BatchProcessResult::FaultyFailure {
|
||||||
imported_blocks,
|
imported_blocks,
|
||||||
penalty,
|
penalty,
|
||||||
},
|
},
|
||||||
None => BatchProcessResult::NonFaultyFailure,
|
None => BatchProcessResult::NonFaultyFailure,
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// this a request from the Backfill sync
|
};
|
||||||
ChainSegmentProcessId::BackSyncBatchId(epoch) => {
|
|
||||||
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
|
|
||||||
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
|
|
||||||
let sent_blocks = downloaded_blocks.len();
|
|
||||||
let n_blobs = downloaded_blocks
|
|
||||||
.iter()
|
|
||||||
.map(|wrapped| wrapped.n_blobs())
|
|
||||||
.sum::<usize>();
|
|
||||||
let n_data_columns = downloaded_blocks
|
|
||||||
.iter()
|
|
||||||
.map(|wrapped| wrapped.n_data_columns())
|
|
||||||
.sum::<usize>();
|
|
||||||
|
|
||||||
match self.process_backfill_blocks(downloaded_blocks) {
|
self.send_sync_message(SyncMessage::BatchProcessed {
|
||||||
(imported_blocks, Ok(_)) => {
|
sync_type: process_id,
|
||||||
debug!(
|
result,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
|
||||||
|
/// thread if more blocks are needed to process it.
|
||||||
|
#[instrument(
|
||||||
|
name = SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL,
|
||||||
|
parent = None,
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
fields(downloaded_blocks = downloaded_blocks.len())
|
||||||
|
)]
|
||||||
|
pub fn process_chain_segment_backfill(
|
||||||
|
&self,
|
||||||
|
process_id: ChainSegmentProcessId,
|
||||||
|
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,
|
||||||
|
) {
|
||||||
|
let ChainSegmentProcessId::BackSyncBatchId(epoch) = process_id else {
|
||||||
|
// this a request from RangeSync, this should _never_ happen
|
||||||
|
crit!(
|
||||||
|
error =
|
||||||
|
"process_chain_segment_backfill called on a variant other than BackSyncBatchId",
|
||||||
|
"Please notify the devs"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
|
||||||
|
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
|
||||||
|
let sent_blocks = downloaded_blocks.len();
|
||||||
|
let n_blobs = downloaded_blocks
|
||||||
|
.iter()
|
||||||
|
.map(|wrapped| wrapped.n_blobs())
|
||||||
|
.sum::<usize>();
|
||||||
|
let n_data_columns = downloaded_blocks
|
||||||
|
.iter()
|
||||||
|
.map(|wrapped| wrapped.n_data_columns())
|
||||||
|
.sum::<usize>();
|
||||||
|
|
||||||
|
let result = match self.process_backfill_blocks(downloaded_blocks) {
|
||||||
|
(imported_blocks, Ok(_)) => {
|
||||||
|
debug!(
|
||||||
batch_epoch = %epoch,
|
batch_epoch = %epoch,
|
||||||
first_block_slot = start_slot,
|
first_block_slot = start_slot,
|
||||||
keep_execution_payload = !self.chain.store.get_config().prune_payloads,
|
keep_execution_payload = !self.chain.store.get_config().prune_payloads,
|
||||||
@@ -513,34 +557,35 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
processed_data_columns = n_data_columns,
|
processed_data_columns = n_data_columns,
|
||||||
service= "sync",
|
service= "sync",
|
||||||
"Backfill batch processed");
|
"Backfill batch processed");
|
||||||
BatchProcessResult::Success {
|
BatchProcessResult::Success {
|
||||||
sent_blocks,
|
sent_blocks,
|
||||||
imported_blocks,
|
imported_blocks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(_, Err(e)) => {
|
(_, Err(e)) => {
|
||||||
debug!(
|
debug!(
|
||||||
batch_epoch = %epoch,
|
batch_epoch = %epoch,
|
||||||
first_block_slot = start_slot,
|
first_block_slot = start_slot,
|
||||||
last_block_slot = end_slot,
|
last_block_slot = end_slot,
|
||||||
processed_blobs = n_blobs,
|
processed_blobs = n_blobs,
|
||||||
error = %e.message,
|
error = %e.message,
|
||||||
service = "sync",
|
service = "sync",
|
||||||
"Backfill batch processing failed"
|
"Backfill batch processing failed"
|
||||||
);
|
);
|
||||||
match e.peer_action {
|
match e.peer_action {
|
||||||
Some(penalty) => BatchProcessResult::FaultyFailure {
|
Some(penalty) => BatchProcessResult::FaultyFailure {
|
||||||
imported_blocks: 0,
|
imported_blocks: 0,
|
||||||
penalty,
|
penalty,
|
||||||
},
|
},
|
||||||
None => BatchProcessResult::NonFaultyFailure,
|
None => BatchProcessResult::NonFaultyFailure,
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
|
self.send_sync_message(SyncMessage::BatchProcessed {
|
||||||
|
sync_type: process_id,
|
||||||
|
result,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
|
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ use beacon_chain::test_utils::{
|
|||||||
test_spec,
|
test_spec,
|
||||||
};
|
};
|
||||||
use beacon_chain::{BeaconChain, WhenSlotSkipped};
|
use beacon_chain::{BeaconChain, WhenSlotSkipped};
|
||||||
|
use beacon_processor::rayon_manager::RayonManager;
|
||||||
use beacon_processor::{work_reprocessing_queue::*, *};
|
use beacon_processor::{work_reprocessing_queue::*, *};
|
||||||
use gossipsub::MessageAcceptance;
|
use gossipsub::MessageAcceptance;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
@@ -266,6 +267,7 @@ impl TestRig {
|
|||||||
executor,
|
executor,
|
||||||
current_workers: 0,
|
current_workers: 0,
|
||||||
config: beacon_processor_config,
|
config: beacon_processor_config,
|
||||||
|
rayon_manager: RayonManager::default(),
|
||||||
}
|
}
|
||||||
.spawn_manager(
|
.spawn_manager(
|
||||||
beacon_processor_rx,
|
beacon_processor_rx,
|
||||||
@@ -458,10 +460,10 @@ impl TestRig {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn enqueue_backfill_batch(&self) {
|
pub fn enqueue_backfill_batch(&self, epoch: Epoch) {
|
||||||
self.network_beacon_processor
|
self.network_beacon_processor
|
||||||
.send_chain_segment(
|
.send_chain_segment(
|
||||||
ChainSegmentProcessId::BackSyncBatchId(Epoch::default()),
|
ChainSegmentProcessId::BackSyncBatchId(epoch),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -606,7 +608,7 @@ impl TestRig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn assert_event_journal(&mut self, expected: &[&str]) {
|
pub async fn assert_event_journal(&mut self, expected: &[&str]) {
|
||||||
self.assert_event_journal_with_timeout(expected, STANDARD_TIMEOUT)
|
self.assert_event_journal_with_timeout(expected, STANDARD_TIMEOUT, false, false)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -623,6 +625,8 @@ impl TestRig {
|
|||||||
.chain(std::iter::once(NOTHING_TO_DO))
|
.chain(std::iter::once(NOTHING_TO_DO))
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
timeout,
|
timeout,
|
||||||
|
false,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -666,11 +670,21 @@ impl TestRig {
|
|||||||
&mut self,
|
&mut self,
|
||||||
expected: &[&str],
|
expected: &[&str],
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
|
ignore_worker_freed: bool,
|
||||||
|
ignore_nothing_to_do: bool,
|
||||||
) {
|
) {
|
||||||
let mut events = Vec::with_capacity(expected.len());
|
let mut events = Vec::with_capacity(expected.len());
|
||||||
|
|
||||||
let drain_future = async {
|
let drain_future = async {
|
||||||
while let Some(event) = self.work_journal_rx.recv().await {
|
while let Some(event) = self.work_journal_rx.recv().await {
|
||||||
|
if event == WORKER_FREED && ignore_worker_freed {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if event == NOTHING_TO_DO && ignore_nothing_to_do {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
events.push(event);
|
events.push(event);
|
||||||
|
|
||||||
// Break as soon as we collect the desired number of events.
|
// Break as soon as we collect the desired number of events.
|
||||||
@@ -1384,6 +1398,8 @@ async fn requeue_unknown_block_gossip_attestation_without_import() {
|
|||||||
NOTHING_TO_DO,
|
NOTHING_TO_DO,
|
||||||
],
|
],
|
||||||
Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY,
|
Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY,
|
||||||
|
false,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -1424,6 +1440,8 @@ async fn requeue_unknown_block_gossip_aggregated_attestation_without_import() {
|
|||||||
NOTHING_TO_DO,
|
NOTHING_TO_DO,
|
||||||
],
|
],
|
||||||
Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY,
|
Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY,
|
||||||
|
false,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -1558,8 +1576,8 @@ async fn test_backfill_sync_processing() {
|
|||||||
// (not straight forward to manipulate `TestingSlotClock` due to cloning of `SlotClock` in code)
|
// (not straight forward to manipulate `TestingSlotClock` due to cloning of `SlotClock` in code)
|
||||||
// and makes the test very slow, hence timing calculation is unit tested separately in
|
// and makes the test very slow, hence timing calculation is unit tested separately in
|
||||||
// `work_reprocessing_queue`.
|
// `work_reprocessing_queue`.
|
||||||
for _ in 0..1 {
|
for i in 0..1 {
|
||||||
rig.enqueue_backfill_batch();
|
rig.enqueue_backfill_batch(Epoch::new(i));
|
||||||
// ensure queued batch is not processed until later
|
// ensure queued batch is not processed until later
|
||||||
rig.assert_no_events_for(Duration::from_millis(100)).await;
|
rig.assert_no_events_for(Duration::from_millis(100)).await;
|
||||||
// A new batch should be processed within a slot.
|
// A new batch should be processed within a slot.
|
||||||
@@ -1570,6 +1588,8 @@ async fn test_backfill_sync_processing() {
|
|||||||
NOTHING_TO_DO,
|
NOTHING_TO_DO,
|
||||||
],
|
],
|
||||||
rig.chain.slot_clock.slot_duration(),
|
rig.chain.slot_clock.slot_duration(),
|
||||||
|
false,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@@ -1590,8 +1610,8 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() {
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
for _ in 0..3 {
|
for i in 0..3 {
|
||||||
rig.enqueue_backfill_batch();
|
rig.enqueue_backfill_batch(Epoch::new(i));
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure all batches are processed
|
// ensure all batches are processed
|
||||||
@@ -1602,6 +1622,8 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() {
|
|||||||
WorkType::ChainSegmentBackfill.into(),
|
WorkType::ChainSegmentBackfill.into(),
|
||||||
],
|
],
|
||||||
Duration::from_millis(100),
|
Duration::from_millis(100),
|
||||||
|
true,
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user