Remove sync await points (#3036)

## Issue Addressed

Removes the await points in sync waiting for a processor response for rpc block processing. Built on top of #3029 
This also handles a couple of bugs in the previous code and adds a relatively comprehensive test suite.
This commit is contained in:
Divma
2022-03-23 01:09:39 +00:00
parent af50130e21
commit 788b6af3c4
13 changed files with 1722 additions and 727 deletions

View File

@@ -38,9 +38,10 @@
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
//! task.
use crate::sync::manager::BlockProcessType;
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::parking_lot::Mutex;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock};
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
use lighthouse_network::{
@@ -57,7 +58,7 @@ use std::task::Context;
use std::time::Duration;
use std::{cmp, collections::HashSet};
use task_executor::TaskExecutor;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
@@ -74,7 +75,7 @@ mod work_reprocessing_queue;
mod worker;
use crate::beacon_processor::work_reprocessing_queue::QueuedBlock;
pub use worker::{GossipAggregatePackage, GossipAttestationPackage, ProcessId};
pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage};
/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
@@ -196,10 +197,6 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
/// Used to send/receive results from a rpc block import in a blocking task.
pub type BlockResultSender<E> = oneshot::Sender<Result<Hash256, BlockError<E>>>;
pub type BlockResultReceiver<E> = oneshot::Receiver<Result<Hash256, BlockError<E>>>;
/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
queue: VecDeque<T>,
@@ -496,18 +493,22 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
/// sent to the other side of `result_tx`.
pub fn rpc_beacon_block(
block: Box<SignedBeaconBlock<T::EthSpec>>,
) -> (Self, BlockResultReceiver<T::EthSpec>) {
let (result_tx, result_rx) = oneshot::channel();
let event = Self {
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Self {
Self {
drop_during_sync: false,
work: Work::RpcBlock { block, result_tx },
};
(event, result_rx)
work: Work::RpcBlock {
block,
seen_timestamp,
process_type,
},
}
}
/// Create a new work event to import `blocks` as a beacon chain segment.
pub fn chain_segment(
process_id: ProcessId,
process_id: ChainSegmentProcessId,
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
) -> Self {
Self {
@@ -692,10 +693,11 @@ pub enum Work<T: BeaconChainTypes> {
},
RpcBlock {
block: Box<SignedBeaconBlock<T::EthSpec>>,
result_tx: BlockResultSender<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
},
ChainSegment {
process_id: ProcessId,
process_id: ChainSegmentProcessId,
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
},
Status {
@@ -1488,10 +1490,15 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
/*
* Verification for beacon blocks received during syncing via RPC.
*/
Work::RpcBlock { block, result_tx } => {
Work::RpcBlock {
block,
seen_timestamp,
process_type,
} => {
worker.process_rpc_block(
*block,
result_tx,
seen_timestamp,
process_type,
work_reprocessing_tx.clone(),
duplicate_cache,
);

View File

@@ -240,7 +240,13 @@ impl TestRig {
}
pub fn enqueue_rpc_block(&self) {
let (event, _rx) = WorkEvent::rpc_beacon_block(Box::new(self.next_block.clone()));
let event = WorkEvent::rpc_beacon_block(
Box::new(self.next_block.clone()),
std::time::Duration::default(),
BlockProcessType::ParentLookup {
chain_hash: Hash256::random(),
},
);
self.beacon_processor_tx.try_send(event).unwrap();
}

View File

@@ -10,7 +10,7 @@ mod rpc_methods;
mod sync_methods;
pub use gossip_methods::{GossipAggregatePackage, GossipAttestationPackage};
pub use sync_methods::ProcessId;
pub use sync_methods::ChainSegmentProcessId;
pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;

View File

@@ -1,26 +1,28 @@
use std::time::Duration;
use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker};
use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE;
use crate::beacon_processor::{BlockResultSender, DuplicateCache};
use crate::beacon_processor::DuplicateCache;
use crate::metrics;
use crate::sync::manager::{SyncMessage, SyncRequestType};
use crate::sync::manager::{BlockProcessType, SyncMessage};
use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::{
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
};
use lighthouse_network::{PeerAction, PeerId};
use slog::{crit, debug, error, info, trace, warn};
use lighthouse_network::PeerAction;
use slog::{debug, error, info, trace, warn};
use tokio::sync::mpsc;
use types::{Epoch, Hash256, SignedBeaconBlock};
/// Id associated to a block processing request, either a batch or a single block.
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
pub enum ProcessId {
pub enum ChainSegmentProcessId {
/// Processing Id of a range syncing batch.
RangeBatchId(ChainId, Epoch),
/// Processing ID for a backfill syncing batch.
BackSyncBatchId(Epoch),
/// Processing Id of the parent lookup of a block.
ParentLookup(PeerId, Hash256),
ParentLookup(Hash256),
}
/// Returned when a chain segment import fails.
@@ -32,88 +34,77 @@ struct ChainSegmentFailed {
}
impl<T: BeaconChainTypes> Worker<T> {
/// Attempt to process a block received from a direct RPC request, returning the processing
/// result on the `result_tx` channel.
///
/// Raises a log if there are errors publishing the result to the channel.
/// Attempt to process a block received from a direct RPC request.
pub fn process_rpc_block(
self,
block: SignedBeaconBlock<T::EthSpec>,
result_tx: BlockResultSender<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
duplicate_cache: DuplicateCache,
) {
let block_root = block.canonical_root();
// Checks if the block is already being imported through another source
if let Some(handle) = duplicate_cache.check_and_insert(block_root) {
let slot = block.slot();
let block_result = self.chain.process_block(block);
// Check if the block is already being imported through another source
let handle = match duplicate_cache.check_and_insert(block.canonical_root()) {
Some(handle) => handle,
None => {
// Sync handles these results
self.send_sync_message(SyncMessage::BlockProcessed {
process_type,
result: Err(BlockError::BlockIsAlreadyKnown),
});
return;
}
};
let slot = block.slot();
let result = self.chain.process_block(block);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
if let Ok(root) = &block_result {
info!(
self.log,
"New RPC block received";
"slot" => slot,
"hash" => %root
// RPC block imported, regardless of process type
if let &Ok(hash) = &result {
info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash);
// Trigger processing for work referencing this block.
let reprocess_msg = ReprocessQueueMessage::BlockImported(hash);
if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
};
if matches!(process_type, BlockProcessType::SingleBlock { .. }) {
self.chain.block_times_cache.write().set_time_observed(
hash,
slot,
seen_timestamp,
None,
None,
);
if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported(*root))
.is_err()
{
error!(
self.log,
"Failed to inform block import";
"source" => "rpc",
"block_root" => %root,
)
};
}
if result_tx.send(block_result).is_err() {
crit!(self.log, "Failed return sync block result");
}
// Drop the handle to remove the entry from the cache
drop(handle);
} else {
debug!(
self.log,
"Gossip block is being imported";
"block_root" => %block_root,
);
// The gossip block that is being imported should eventually
// trigger reprocessing of queued attestations once it is imported.
// If the gossip block fails import, then it will be downscored
// appropriately in `process_gossip_block`.
// Here, we assume that the block will eventually be imported and
// send a `BlockIsAlreadyKnown` message to sync.
if result_tx
.send(Err(BlockError::BlockIsAlreadyKnown))
.is_err()
{
crit!(self.log, "Failed return sync block result");
self.run_fork_choice()
}
}
// Sync handles these results
self.send_sync_message(SyncMessage::BlockProcessed {
process_type,
result: result.map(|_| ()),
});
// Drop the handle to remove the entry from the cache
drop(handle);
}
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub fn process_chain_segment(
&self,
process_id: ProcessId,
sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
) {
match process_id {
let result = match sync_type {
// this a request from the range sync
ProcessId::RangeBatchId(chain_id, epoch) => {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
let sent_blocks = downloaded_blocks.len();
let result = match self.process_blocks(downloaded_blocks.iter()) {
match self.process_blocks(downloaded_blocks.iter()) {
(_, Ok(_)) => {
debug!(self.log, "Batch processed";
"batch_epoch" => epoch,
@@ -139,19 +130,15 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_action: e.peer_action,
}
}
};
let sync_type = SyncRequestType::RangeSync(epoch, chain_id);
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
}
// this a request from the Backfill sync
ProcessId::BackSyncBatchId(epoch) => {
ChainSegmentProcessId::BackSyncBatchId(epoch) => {
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
let sent_blocks = downloaded_blocks.len();
let result = match self.process_backfill_blocks(&downloaded_blocks) {
match self.process_backfill_blocks(&downloaded_blocks) {
(_, Ok(_)) => {
debug!(self.log, "Backfill batch processed";
"batch_epoch" => epoch,
@@ -173,35 +160,34 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_action: e.peer_action,
}
}
};
let sync_type = SyncRequestType::BackFillSync(epoch);
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
}
// this is a parent lookup request from the sync manager
ProcessId::ParentLookup(peer_id, chain_head) => {
ChainSegmentProcessId::ParentLookup(chain_head) => {
debug!(
self.log, "Processing parent lookup";
"last_peer_id" => %peer_id,
"chain_hash" => %chain_head,
"blocks" => downloaded_blocks.len()
);
// parent blocks are ordered from highest slot to lowest, so we need to process in
// reverse
match self.process_blocks(downloaded_blocks.iter().rev()) {
(_, Err(e)) => {
debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => %e.message);
self.send_sync_message(SyncMessage::ParentLookupFailed {
peer_id,
chain_head,
})
(imported_blocks, Err(e)) => {
debug!(self.log, "Parent lookup failed"; "error" => %e.message);
BatchProcessResult::Failed {
imported_blocks: imported_blocks > 0,
peer_action: e.peer_action,
}
}
(_, Ok(_)) => {
(imported_blocks, Ok(_)) => {
debug!(self.log, "Parent lookup processed successfully");
BatchProcessResult::Success(imported_blocks > 0)
}
}
}
}
};
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
/// Helper function to process blocks batches which only consumes the chain and blocks to process.