From f67084a571d14efb5473bf53fe32cf0128482973 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 20 Jun 2025 05:52:16 +0300 Subject: [PATCH] Remove reprocess channel (#7437) Partially https://github.com/sigp/lighthouse/issues/6291 This PR removes the reprocess event channel from being externally exposed. All work events are now sent through the single `BeaconProcessorSend` channel. I've introduced a new `Work::Reprocess` enum variant which we then use to schedule jobs for reprocess. I've also created a new scheduler module which will eventually house the different scheduler impls. This is all needed as an initial step to generalize the beacon processor A "full" implementation for the generalized beacon processor can be found here https://github.com/sigp/lighthouse/pull/6448 I'm going to try to break up the full implementation into smaller PR's so it can actually be reviewed --- beacon_node/beacon_processor/src/lib.rs | 44 ++++--- .../beacon_processor/src/scheduler/mod.rs | 1 + .../work_reprocessing_queue.rs | 0 beacon_node/client/src/builder.rs | 9 +- .../src/compute_light_client_updates.rs | 12 +- beacon_node/http_api/src/lib.rs | 14 +-- .../http_api/src/publish_attestations.rs | 19 +-- beacon_node/http_api/src/task_spawner.rs | 27 ++++ beacon_node/http_api/src/test_utils.rs | 6 - .../http_api/tests/interactive_tests.rs | 14 ++- .../gossip_methods.rs | 117 +++++++++++------- .../src/network_beacon_processor/mod.rs | 31 ++--- .../network_beacon_processor/sync_methods.rs | 23 +++- .../src/network_beacon_processor/tests.rs | 5 - beacon_node/network/src/router.rs | 6 +- beacon_node/network/src/service.rs | 6 +- beacon_node/network/src/service/tests.rs | 4 - 17 files changed, 186 insertions(+), 152 deletions(-) create mode 100644 beacon_node/beacon_processor/src/scheduler/mod.rs rename beacon_node/beacon_processor/src/{ => scheduler}/work_reprocessing_queue.rs (100%) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 659cb808d5..0f324071a1 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -47,6 +47,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use logging::crit; use logging::TimeLatch; use parking_lot::Mutex; +pub use scheduler::work_reprocessing_queue; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; use std::cmp; @@ -73,7 +74,7 @@ use work_reprocessing_queue::{ use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest}; mod metrics; -pub mod work_reprocessing_queue; +pub mod scheduler; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// @@ -264,22 +265,16 @@ impl Default for BeaconProcessorConfig { pub struct BeaconProcessorChannels { pub beacon_processor_tx: BeaconProcessorSend, pub beacon_processor_rx: mpsc::Receiver>, - pub work_reprocessing_tx: mpsc::Sender, - pub work_reprocessing_rx: mpsc::Receiver, } impl BeaconProcessorChannels { pub fn new(config: &BeaconProcessorConfig) -> Self { let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(config.max_work_event_queue_len); - let (work_reprocessing_tx, work_reprocessing_rx) = - mpsc::channel(config.max_scheduled_work_queue_len); Self { beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx), beacon_processor_rx, - work_reprocessing_rx, - work_reprocessing_tx, } } } @@ -638,6 +633,7 @@ pub enum Work { LightClientUpdatesByRangeRequest(BlockingFn), ApiRequestP0(BlockingOrAsync), ApiRequestP1(BlockingOrAsync), + Reprocess(ReprocessQueueMessage), } impl fmt::Debug for Work { @@ -692,6 +688,7 @@ pub enum WorkType { LightClientUpdatesByRangeRequest, ApiRequestP0, ApiRequestP1, + Reprocess, } impl Work { @@ -750,6 +747,7 @@ impl Work { } Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0, Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1, + Work::Reprocess { .. } => WorkType::Reprocess, } } } @@ -774,7 +772,7 @@ struct InboundEvents { /// Used by upstream processes to send new work to the `BeaconProcessor`. event_rx: mpsc::Receiver>, /// Used internally for queuing work ready to be re-processed. - reprocess_work_rx: mpsc::Receiver, + ready_work_rx: mpsc::Receiver, } impl Stream for InboundEvents { @@ -795,7 +793,7 @@ impl Stream for InboundEvents { // Poll for delayed blocks before polling for new work. It might be the case that a delayed // block is required to successfully process some new work. - match self.reprocess_work_rx.poll_recv(cx) { + match self.ready_work_rx.poll_recv(cx) { Poll::Ready(Some(ready_work)) => { return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into()))); } @@ -846,8 +844,6 @@ impl BeaconProcessor { pub fn spawn_manager( mut self, event_rx: mpsc::Receiver>, - work_reprocessing_tx: mpsc::Sender, - work_reprocessing_rx: mpsc::Receiver, work_journal_tx: Option>, slot_clock: S, maximum_gossip_clock_disparity: Duration, @@ -935,9 +931,13 @@ impl BeaconProcessor { // receive them back once they are ready (`ready_work_rx`). let (ready_work_tx, ready_work_rx) = mpsc::channel::(self.config.max_scheduled_work_queue_len); + + let (reprocess_work_tx, reprocess_work_rx) = + mpsc::channel::(self.config.max_scheduled_work_queue_len); + spawn_reprocess_scheduler( ready_work_tx, - work_reprocessing_rx, + reprocess_work_rx, &self.executor, Arc::new(slot_clock), maximum_gossip_clock_disparity, @@ -951,7 +951,7 @@ impl BeaconProcessor { let mut inbound_events = InboundEvents { idle_rx, event_rx, - reprocess_work_rx: ready_work_rx, + ready_work_rx, }; let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting; @@ -965,7 +965,7 @@ impl BeaconProcessor { Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => { match QueuedBackfillBatch::try_from(event) { Ok(backfill_batch) => { - match work_reprocessing_tx + match reprocess_work_tx .try_send(ReprocessQueueMessage::BackfillSync(backfill_batch)) { Err(e) => { @@ -1027,8 +1027,10 @@ impl BeaconProcessor { .unwrap_or(WORKER_FREED); // We don't care if this message was successfully sent, we only use the journal - // during testing. - let _ = work_journal_tx.try_send(id); + // during testing. We also ignore reprocess messages to ensure our test cases can pass. + if id != "reprocess" { + let _ = work_journal_tx.try_send(id); + } } let can_spawn = self.current_workers < self.config.max_workers; @@ -1318,6 +1320,14 @@ impl BeaconProcessor { let work_type = work.to_type(); match work { + Work::Reprocess(work_event) => { + if let Err(e) = reprocess_work_tx.try_send(work_event) { + error!( + error = ?e, + "Failed to reprocess work event" + ) + } + } _ if can_spawn => self.spawn_worker(work, idle_tx), Work::GossipAttestation { .. } => attestation_queue.push(work), // Attestation batches are formed internally within the @@ -1488,6 +1498,7 @@ impl BeaconProcessor { WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), WorkType::ApiRequestP0 => api_request_p0_queue.len(), WorkType::ApiRequestP1 => api_request_p1_queue.len(), + WorkType::Reprocess => 0, }; metrics::observe_vec( &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, @@ -1639,6 +1650,7 @@ impl BeaconProcessor { | Work::LightClientUpdatesByRangeRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } + Work::Reprocess(_) => {} }; } } diff --git a/beacon_node/beacon_processor/src/scheduler/mod.rs b/beacon_node/beacon_processor/src/scheduler/mod.rs new file mode 100644 index 0000000000..e1a076a7c5 --- /dev/null +++ b/beacon_node/beacon_processor/src/scheduler/mod.rs @@ -0,0 +1 @@ +pub mod work_reprocessing_queue; diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs similarity index 100% rename from beacon_node/beacon_processor/src/work_reprocessing_queue.rs rename to beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 94e6961455..baea0c06e5 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -546,7 +546,6 @@ where network_senders: None, network_globals: None, beacon_processor_send: None, - beacon_processor_reprocess_send: None, eth1_service: Some(genesis_service.eth1_service.clone()), sse_logging_components: runtime_context.sse_logging_components.clone(), }); @@ -638,7 +637,6 @@ where context.executor, libp2p_registry.as_mut(), beacon_processor_channels.beacon_processor_tx.clone(), - beacon_processor_channels.work_reprocessing_tx.clone(), ) .await .map_err(|e| format!("Failed to start network: {:?}", e))?; @@ -777,9 +775,6 @@ where network_globals: self.network_globals.clone(), eth1_service: self.eth1_service.clone(), beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()), - beacon_processor_reprocess_send: Some( - beacon_processor_channels.work_reprocessing_tx.clone(), - ), sse_logging_components: runtime_context.sse_logging_components.clone(), }); @@ -843,8 +838,6 @@ where } .spawn_manager( beacon_processor_channels.beacon_processor_rx, - beacon_processor_channels.work_reprocessing_tx.clone(), - beacon_processor_channels.work_reprocessing_rx, None, beacon_chain.slot_clock.clone(), beacon_chain.spec.maximum_gossip_clock_disparity(), @@ -918,7 +911,7 @@ where compute_light_client_updates( &inner_chain, light_client_server_rv, - beacon_processor_channels.work_reprocessing_tx, + beacon_processor_channels.beacon_processor_tx, ) .await }, diff --git a/beacon_node/client/src/compute_light_client_updates.rs b/beacon_node/client/src/compute_light_client_updates.rs index fab284c428..75fa22e795 100644 --- a/beacon_node/client/src/compute_light_client_updates.rs +++ b/beacon_node/client/src/compute_light_client_updates.rs @@ -1,8 +1,8 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent}; use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage; +use beacon_processor::{BeaconProcessorSend, Work, WorkEvent}; use futures::channel::mpsc::Receiver; use futures::StreamExt; -use tokio::sync::mpsc::Sender; use tracing::error; // Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent @@ -14,7 +14,7 @@ pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32; pub async fn compute_light_client_updates( chain: &BeaconChain, mut light_client_server_rv: Receiver>, - reprocess_tx: Sender, + beacon_processor_send: BeaconProcessorSend, ) { // Should only receive events for recent blocks, import_block filters by blocks close to clock. // @@ -31,7 +31,13 @@ pub async fn compute_light_client_updates( }); let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root }; - if reprocess_tx.try_send(msg).is_err() { + if beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: true, + work: Work::Reprocess(msg), + }) + .is_err() + { error!(%parent_root,"Failed to inform light client update") }; } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 9453f1725a..73b20197c1 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -40,7 +40,7 @@ use beacon_chain::{ validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped, }; -use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; +use beacon_processor::BeaconProcessorSend; pub use block_id::BlockId; use builder_states::get_next_withdrawals; use bytes::Bytes; @@ -130,7 +130,6 @@ pub struct Context { pub network_senders: Option>, pub network_globals: Option>>, pub beacon_processor_send: Option>, - pub beacon_processor_reprocess_send: Option>, pub eth1_service: Option, pub sse_logging_components: Option, } @@ -554,11 +553,6 @@ pub fn serve( .filter(|_| config.enable_beacon_processor); let task_spawner_filter = warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone())); - let beacon_processor_reprocess_send = ctx - .beacon_processor_reprocess_send - .clone() - .filter(|_| config.enable_beacon_processor); - let reprocess_send_filter = warp::any().map(move || beacon_processor_reprocess_send.clone()); let duplicate_block_status_code = ctx.config.duplicate_block_status_code; @@ -1986,20 +1980,18 @@ pub fn serve( .and(warp_utils::json::json::>()) .and(optional_consensus_version_header_filter) .and(network_tx_filter.clone()) - .and(reprocess_send_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec, _fork_name: Option, - network_tx: UnboundedSender>, - reprocess_tx: Option>| async move { + network_tx: UnboundedSender>| async move { let result = crate::publish_attestations::publish_attestations( task_spawner, chain, attestations, network_tx, - reprocess_tx, + true, ) .await .map(|()| warp::reply::json(&())); diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index 3c18a8ec41..a4fcb27b1d 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -40,15 +40,13 @@ use beacon_chain::{ BeaconChainTypes, }; use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage}; +use beacon_processor::{Work, WorkEvent}; use eth2::types::Failure; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{ - mpsc::{Sender, UnboundedSender}, - oneshot, -}; +use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tracing::{debug, error, warn}; use types::SingleAttestation; @@ -130,7 +128,7 @@ pub async fn publish_attestations( chain: Arc>, attestations: Vec, network_tx: UnboundedSender>, - reprocess_send: Option>, + allow_reprocess: bool, ) -> Result<(), warp::Rejection> { // Collect metadata about attestations which we'll use to report failures. We need to // move the `attestations` vec into the blocking task, so this small overhead is unavoidable. @@ -142,6 +140,7 @@ pub async fn publish_attestations( // Gossip validate and publish attestations that can be immediately processed. let seen_timestamp = timestamp_now(); let mut prelim_results = task_spawner + .clone() .blocking_task(Priority::P0, move || { Ok(attestations .into_iter() @@ -156,7 +155,7 @@ pub async fn publish_attestations( Err(Error::Validation(AttestationError::UnknownHeadBlock { beacon_block_root, })) => { - let Some(reprocess_tx) = &reprocess_send else { + if !allow_reprocess { return PublishAttestationResult::Failure(Error::ReprocessDisabled); }; // Re-process. @@ -180,7 +179,13 @@ pub async fn publish_attestations( beacon_block_root, process_fn: Box::new(reprocess_fn), }); - if reprocess_tx.try_send(reprocess_msg).is_err() { + if task_spawner + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { PublishAttestationResult::Failure(Error::ReprocessFull) } else { PublishAttestationResult::Reprocessing(rx) diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index a679b294f6..834cd29971 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -30,6 +30,7 @@ impl Priority { } /// Spawns tasks on the `BeaconProcessor` or directly on the tokio executor. +#[derive(Clone)] pub struct TaskSpawner { /// Used to send tasks to the `BeaconProcessor`. The tokio executor will be /// used if this is `None`. @@ -155,6 +156,32 @@ impl TaskSpawner { .and_then(|x| x) } } + + pub fn try_send(&self, work_event: WorkEvent) -> Result<(), warp::Rejection> { + if let Some(beacon_processor_send) = &self.beacon_processor_send { + let error_message = match beacon_processor_send.try_send(work_event) { + Ok(()) => None, + Err(TrySendError::Full(_)) => { + Some("The task was dropped. The server is overloaded.") + } + Err(TrySendError::Closed(_)) => { + Some("The task was dropped. The server is shutting down.") + } + }; + + if let Some(error_message) = error_message { + return Err(warp_utils::reject::custom_server_error( + error_message.to_string(), + )); + }; + + Ok(()) + } else { + Err(warp_utils::reject::custom_server_error( + "The beacon processor is unavailable".to_string(), + )) + } + } } /// Send a task to the beacon processor and await execution. diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index f78a361dad..9c285f4039 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -201,12 +201,9 @@ pub async fn create_api_server_with_config( let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx, } = BeaconProcessorChannels::new(&beacon_processor_config); let beacon_processor_send = beacon_processor_tx; - let reprocess_send = work_reprocessing_tx.clone(); BeaconProcessor { network_globals: network_globals.clone(), executor: test_runtime.task_executor.clone(), @@ -215,8 +212,6 @@ pub async fn create_api_server_with_config( } .spawn_manager( beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx, None, chain.slot_clock.clone(), chain.spec.maximum_gossip_clock_disparity(), @@ -241,7 +236,6 @@ pub async fn create_api_server_with_config( network_senders: Some(network_senders), network_globals: Some(network_globals), beacon_processor_send: Some(beacon_processor_send), - beacon_processor_reprocess_send: Some(reprocess_send), eth1_service: Some(eth1_service), sse_logging_components: None, }); diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 399474c85c..1a31f1398a 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -4,7 +4,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BlockStrategy, LightClientStrategy, SyncCommitteeStrategy}, ChainConfig, }; -use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage; +use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, Work, WorkEvent}; use eth2::types::ProduceBlockV3Response; use eth2::types::{DepositContractData, StateId}; use execution_layer::{ForkchoiceState, PayloadAttributes}; @@ -923,14 +923,16 @@ async fn queue_attestations_from_http() { .unwrap(); tester .ctx - .beacon_processor_reprocess_send + .beacon_processor_send .as_ref() .unwrap() - .send(ReprocessQueueMessage::BlockImported { - block_root, - parent_root, + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::BlockImported { + block_root, + parent_root, + }), }) - .await .unwrap(); attestation_future.await.unwrap(); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index d1a75809a9..fcab34c93e 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -19,6 +19,7 @@ use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, NotifyExecutionLayer, }; +use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use logging::crit; use operation_pool::ReceivedPreCapella; @@ -30,8 +31,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, error, info, trace, warn}; use types::{ beacon_block::BlockImportSource, Attestation, AttestationData, AttestationRef, @@ -205,7 +205,7 @@ impl NetworkBeaconProcessor { attestation: Box, subnet_id: SubnetId, should_import: bool, - reprocess_tx: Option>, + allow_reprocess: bool, seen_timestamp: Duration, ) { let result = match self @@ -228,7 +228,7 @@ impl NetworkBeaconProcessor { message_id, peer_id, subnet_id, - reprocess_tx, + allow_reprocess, should_import, seen_timestamp, ); @@ -237,7 +237,7 @@ impl NetworkBeaconProcessor { pub fn process_gossip_attestation_batch( self: Arc, packages: GossipAttestationBatch, - reprocess_tx: Option>, + allow_reprocess: bool, ) { let attestations_and_subnets = packages .iter() @@ -298,7 +298,7 @@ impl NetworkBeaconProcessor { package.message_id, package.peer_id, package.subnet_id, - reprocess_tx.clone(), + allow_reprocess, package.should_import, package.seen_timestamp, ); @@ -314,7 +314,7 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, subnet_id: SubnetId, - reprocess_tx: Option>, + allow_reprocess: bool, should_import: bool, seen_timestamp: Duration, ) { @@ -398,7 +398,7 @@ impl NetworkBeaconProcessor { should_import, seen_timestamp, }, - reprocess_tx, + allow_reprocess, error, seen_timestamp, ); @@ -418,7 +418,7 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, aggregate: Box>, - reprocess_tx: Option>, + allow_reprocess: bool, seen_timestamp: Duration, ) { let beacon_block_root = aggregate.message().aggregate().data().beacon_block_root; @@ -442,7 +442,7 @@ impl NetworkBeaconProcessor { beacon_block_root, message_id, peer_id, - reprocess_tx, + allow_reprocess, seen_timestamp, ); } @@ -450,7 +450,7 @@ impl NetworkBeaconProcessor { pub fn process_gossip_aggregate_batch( self: Arc, packages: Vec>, - reprocess_tx: Option>, + allow_reprocess: bool, ) { let aggregates = packages.iter().map(|package| package.aggregate.as_ref()); @@ -504,7 +504,7 @@ impl NetworkBeaconProcessor { package.beacon_block_root, package.message_id, package.peer_id, - reprocess_tx.clone(), + allow_reprocess, package.seen_timestamp, ); } @@ -516,7 +516,7 @@ impl NetworkBeaconProcessor { beacon_block_root: Hash256, message_id: MessageId, peer_id: PeerId, - reprocess_tx: Option>, + allow_reprocess: bool, seen_timestamp: Duration, ) { match result { @@ -595,7 +595,7 @@ impl NetworkBeaconProcessor { attestation: signed_aggregate, seen_timestamp, }, - reprocess_tx, + allow_reprocess, error, seen_timestamp, ); @@ -1039,9 +1039,9 @@ impl NetworkBeaconProcessor { // another column arrives it either completes availability or pushes // reconstruction back a bit. let cloned_self = Arc::clone(self); - let send_result = self - .reprocess_tx - .send(ReprocessQueueMessage::DelayColumnReconstruction( + let send_result = self.beacon_processor_send.try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction( QueuedColumnReconstruction { block_root, process_fn: Box::pin(async move { @@ -1050,11 +1050,15 @@ impl NetworkBeaconProcessor { .await; }), }, - )) - .await; - if let Err(SendError(ReprocessQueueMessage::DelayColumnReconstruction( - reconstruction, - ))) = send_result + )), + }); + if let Err(TrySendError::Full(WorkEvent { + work: + Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction( + reconstruction, + )), + .. + })) = send_result { warn!("Unable to send reconstruction to reprocessing"); // Execute it immediately instead. @@ -1099,7 +1103,6 @@ impl NetworkBeaconProcessor { peer_id: PeerId, peer_client: Client, block: Arc>, - reprocess_tx: mpsc::Sender, duplicate_cache: DuplicateCache, invalid_block_storage: InvalidBlockStorage, seen_duration: Duration, @@ -1110,7 +1113,6 @@ impl NetworkBeaconProcessor { peer_id, peer_client, block.clone(), - reprocess_tx.clone(), seen_duration, ) .await @@ -1121,7 +1123,6 @@ impl NetworkBeaconProcessor { self.process_gossip_verified_block( peer_id, gossip_verified_block, - reprocess_tx, invalid_block_storage, seen_duration, ) @@ -1147,7 +1148,6 @@ impl NetworkBeaconProcessor { peer_id: PeerId, peer_client: Client, block: Arc>, - reprocess_tx: mpsc::Sender, seen_duration: Duration, ) -> Option> { let block_delay = @@ -1374,24 +1374,28 @@ impl NetworkBeaconProcessor { let inner_self = self.clone(); let process_fn = Box::pin(async move { - let reprocess_tx = inner_self.reprocess_tx.clone(); let invalid_block_storage = inner_self.invalid_block_storage.clone(); inner_self .process_gossip_verified_block( peer_id, verified_block, - reprocess_tx, invalid_block_storage, seen_duration, ) .await; }); - if reprocess_tx - .try_send(ReprocessQueueMessage::EarlyBlock(QueuedGossipBlock { - beacon_block_slot: block_slot, - beacon_block_root: block_root, - process_fn, - })) + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::EarlyBlock( + QueuedGossipBlock { + beacon_block_slot: block_slot, + beacon_block_root: block_root, + process_fn, + }, + )), + }) .is_err() { error!( @@ -1424,7 +1428,6 @@ impl NetworkBeaconProcessor { self: Arc, peer_id: PeerId, verified_block: GossipVerifiedBlock, - reprocess_tx: mpsc::Sender, invalid_block_storage: InvalidBlockStorage, _seen_duration: Duration, ) { @@ -1474,10 +1477,14 @@ impl NetworkBeaconProcessor { match &result { Ok(AvailabilityProcessingStatus::Imported(block_root)) => { - if reprocess_tx - .try_send(ReprocessQueueMessage::BlockImported { - block_root: *block_root, - parent_root: block.message().parent_root(), + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::BlockImported { + block_root: *block_root, + parent_root: block.message().parent_root(), + }), }) .is_err() { @@ -2002,7 +2009,7 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, light_client_optimistic_update: LightClientOptimisticUpdate, - reprocess_tx: Option>, + allow_reprocess: bool, seen_timestamp: Duration, ) { match self.chain.verify_optimistic_update_for_gossip( @@ -2030,7 +2037,7 @@ impl NetworkBeaconProcessor { "Optimistic update for unknown block" ); - if let Some(sender) = reprocess_tx { + if allow_reprocess { let processor = self.clone(); let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate( QueuedLightClientUpdate { @@ -2040,14 +2047,21 @@ impl NetworkBeaconProcessor { message_id, peer_id, light_client_optimistic_update, - None, // Do not reprocess this message again. + false, // Do not reprocess this message again. seen_timestamp, ) }), }, ); - if sender.try_send(msg).is_err() { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: true, + work: Work::Reprocess(msg), + }) + .is_err() + { error!("Failed to send optimistic update for re-processing") } } else { @@ -2117,7 +2131,7 @@ impl NetworkBeaconProcessor { peer_id: PeerId, message_id: MessageId, failed_att: FailedAtt, - reprocess_tx: Option>, + allow_reprocess: bool, error: AttnError, seen_timestamp: Duration, ) { @@ -2357,7 +2371,7 @@ impl NetworkBeaconProcessor { block = ?beacon_block_root, "Attestation for unknown block" ); - if let Some(sender) = reprocess_tx { + if allow_reprocess { // We don't know the block, get the sync manager to handle the block lookup, and // send the attestation to be scheduled for re-processing. self.sync_tx @@ -2384,7 +2398,7 @@ impl NetworkBeaconProcessor { message_id, peer_id, attestation, - None, // Do not allow this attestation to be re-processed beyond this point. + false, // Do not allow this attestation to be re-processed beyond this point. seen_timestamp, ) }), @@ -2409,7 +2423,7 @@ impl NetworkBeaconProcessor { attestation, subnet_id, should_import, - None, // Do not allow this attestation to be re-processed beyond this point. + false, // Do not allow this attestation to be re-processed beyond this point. seen_timestamp, ) }), @@ -2417,7 +2431,14 @@ impl NetworkBeaconProcessor { } }; - if sender.try_send(msg).is_err() { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(msg), + }) + .is_err() + { error!("Failed to send attestation for re-processing") } } else { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 311c09294b..637792ab37 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -12,8 +12,8 @@ use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, }; use beacon_processor::{ - work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache, - GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent, + BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, + WorkEvent as BeaconWorkEvent, }; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, @@ -61,7 +61,6 @@ pub struct NetworkBeaconProcessor { pub chain: Arc>, pub network_tx: mpsc::UnboundedSender>, pub sync_tx: mpsc::UnboundedSender>, - pub reprocess_tx: mpsc::Sender, pub network_globals: Arc>, pub invalid_block_storage: InvalidBlockStorage, pub executor: TaskExecutor, @@ -88,24 +87,21 @@ impl NetworkBeaconProcessor { // Define a closure for processing individual attestations. let processor = self.clone(); let process_individual = move |package: GossipAttestationPackage| { - let reprocess_tx = processor.reprocess_tx.clone(); processor.process_gossip_attestation( package.message_id, package.peer_id, package.attestation, package.subnet_id, package.should_import, - Some(reprocess_tx), + true, package.seen_timestamp, ) }; // Define a closure for processing batches of attestations. let processor = self.clone(); - let process_batch = move |attestations| { - let reprocess_tx = processor.reprocess_tx.clone(); - processor.process_gossip_attestation_batch(attestations, Some(reprocess_tx)) - }; + let process_batch = + move |attestations| processor.process_gossip_attestation_batch(attestations, true); self.try_send(BeaconWorkEvent { drop_during_sync: true, @@ -135,22 +131,19 @@ impl NetworkBeaconProcessor { // Define a closure for processing individual attestations. let processor = self.clone(); let process_individual = move |package: GossipAggregatePackage| { - let reprocess_tx = processor.reprocess_tx.clone(); processor.process_gossip_aggregate( package.message_id, package.peer_id, package.aggregate, - Some(reprocess_tx), + true, package.seen_timestamp, ) }; // Define a closure for processing batches of attestations. let processor = self.clone(); - let process_batch = move |aggregates| { - let reprocess_tx = processor.reprocess_tx.clone(); - processor.process_gossip_aggregate_batch(aggregates, Some(reprocess_tx)) - }; + let process_batch = + move |aggregates| processor.process_gossip_aggregate_batch(aggregates, true); let beacon_block_root = aggregate.message().aggregate().data().beacon_block_root; self.try_send(BeaconWorkEvent { @@ -180,7 +173,6 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { - let reprocess_tx = processor.reprocess_tx.clone(); let invalid_block_storage = processor.invalid_block_storage.clone(); let duplicate_cache = processor.duplicate_cache.clone(); processor @@ -189,7 +181,6 @@ impl NetworkBeaconProcessor { peer_id, peer_client, block, - reprocess_tx, duplicate_cache, invalid_block_storage, seen_timestamp, @@ -382,12 +373,11 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = move || { - let reprocess_tx = processor.reprocess_tx.clone(); processor.process_gossip_optimistic_update( message_id, peer_id, light_client_optimistic_update, - Some(reprocess_tx), + true, seen_timestamp, ) }; @@ -1132,8 +1122,6 @@ impl NetworkBeaconProcessor> { let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx: _work_reprocessing_rx, } = <_>::default(); let (network_tx, _network_rx) = mpsc::unbounded_channel(); @@ -1144,7 +1132,6 @@ impl NetworkBeaconProcessor> { chain, network_tx, sync_tx, - reprocess_tx: work_reprocessing_tx, network_globals, invalid_block_storage: InvalidBlockStorage::Disabled, executor, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 31b17a41a4..cff6e26165 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -17,11 +17,11 @@ use beacon_processor::{ work_reprocessing_queue::{QueuedRpcBlock, ReprocessQueueMessage}, AsyncFn, BlockingFn, DuplicateCache, }; +use beacon_processor::{Work, WorkEvent}; use lighthouse_network::PeerAction; use std::sync::Arc; use std::time::Duration; use store::KzgCommitment; -use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; @@ -57,14 +57,12 @@ impl NetworkBeaconProcessor { process_type: BlockProcessType, ) -> AsyncFn { let process_fn = async move { - let reprocess_tx = self.reprocess_tx.clone(); let duplicate_cache = self.duplicate_cache.clone(); self.process_rpc_block( block_root, block, seen_timestamp, process_type, - reprocess_tx, duplicate_cache, ) .await; @@ -106,7 +104,6 @@ impl NetworkBeaconProcessor { block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, - reprocess_tx: mpsc::Sender, duplicate_cache: DuplicateCache, ) { // Check if the block is already being imported through another source @@ -131,7 +128,14 @@ impl NetworkBeaconProcessor { ignore_fn, }); - if reprocess_tx.try_send(reprocess_msg).is_err() { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { error!(source = "rpc", %block_root,"Failed to inform block import") }; return; @@ -176,7 +180,14 @@ impl NetworkBeaconProcessor { block_root: *hash, parent_root, }; - if reprocess_tx.try_send(reprocess_msg).is_err() { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { error!( source = "rpc", block_root = %hash, diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 4c107cfc87..109c361ebe 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -199,8 +199,6 @@ impl TestRig { let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx, } = BeaconProcessorChannels::new(&beacon_processor_config); let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); @@ -244,7 +242,6 @@ impl TestRig { chain: harness.chain.clone(), network_tx, sync_tx, - reprocess_tx: work_reprocessing_tx.clone(), network_globals: network_globals.clone(), invalid_block_storage: InvalidBlockStorage::Disabled, executor: executor.clone(), @@ -259,8 +256,6 @@ impl TestRig { } .spawn_manager( beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx, Some(work_journal_tx), harness.chain.slot_clock.clone(), chain.spec.maximum_gossip_clock_disparity(), diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 960a1203a6..5d5daae4ae 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -10,9 +10,7 @@ use crate::service::NetworkMessage; use crate::status::status_message; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use beacon_processor::{ - work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache, -}; +use beacon_processor::{BeaconProcessorSend, DuplicateCache}; use futures::prelude::*; use lighthouse_network::rpc::*; use lighthouse_network::{ @@ -87,7 +85,6 @@ impl Router { executor: task_executor::TaskExecutor, invalid_block_storage: InvalidBlockStorage, beacon_processor_send: BeaconProcessorSend, - beacon_processor_reprocess_tx: mpsc::Sender, fork_context: Arc, ) -> Result>, String> { trace!("Service starting"); @@ -103,7 +100,6 @@ impl Router { chain: beacon_chain.clone(), network_tx: network_send.clone(), sync_tx: sync_send.clone(), - reprocess_tx: beacon_processor_reprocess_tx, network_globals: network_globals.clone(), invalid_block_storage, executor: executor.clone(), diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index b120d67007..0a6d515232 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -6,7 +6,7 @@ use crate::router::{Router, RouterMessage}; use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription}; use crate::NetworkConfig; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; +use beacon_processor::BeaconProcessorSend; use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; @@ -211,7 +211,6 @@ impl NetworkService { executor: task_executor::TaskExecutor, libp2p_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, - beacon_processor_reprocess_tx: mpsc::Sender, ) -> Result< ( NetworkService, @@ -315,7 +314,6 @@ impl NetworkService { executor.clone(), invalid_block_storage, beacon_processor_send, - beacon_processor_reprocess_tx, fork_context.clone(), )?; @@ -367,7 +365,6 @@ impl NetworkService { executor: task_executor::TaskExecutor, libp2p_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, - beacon_processor_reprocess_tx: mpsc::Sender, ) -> Result<(Arc>, NetworkSenders), String> { let (network_service, network_globals, network_senders) = Self::build( beacon_chain, @@ -375,7 +372,6 @@ impl NetworkService { executor.clone(), libp2p_registry, beacon_processor_send, - beacon_processor_reprocess_tx, ) .await?; diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 15c3321e94..db34211747 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -58,8 +58,6 @@ fn test_dht_persistence() { let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx: _beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx: _work_reprocessing_rx, } = <_>::default(); let _network_service = NetworkService::start( @@ -68,7 +66,6 @@ fn test_dht_persistence() { executor, None, beacon_processor_tx, - work_reprocessing_tx, ) .await .unwrap(); @@ -137,7 +134,6 @@ fn test_removing_topic_weight_on_old_topics() { executor.clone(), None, beacon_processor_channels.beacon_processor_tx, - beacon_processor_channels.work_reprocessing_tx, ) .await .unwrap()