Deduplicate block root computation (#3590)

## Issue Addressed

NA

## Proposed Changes

This PR removes duplicated block root computation.

Computing the `SignedBeaconBlock::canonical_root` has become more expensive since the merge as we need to compute the merke root of each transaction inside an `ExecutionPayload`.

Computing the root for [a mainnet block](https://beaconcha.in/slot/4704236) is taking ~10ms on my i7-8700K CPU @ 3.70GHz (no sha extensions). Given that our median seen-to-imported time for blocks is presently 300-400ms, removing a few duplicated block roots (~30ms) could represent an easy 10% improvement. When we consider that the seen-to-imported times include operations *after* the block has been placed in the early attester cache, we could expect the 30ms to be more significant WRT our seen-to-attestable times.

## Additional Info

NA
This commit is contained in:
Paul Hauner
2022-09-23 03:52:42 +00:00
parent 76ba0a1aaf
commit fa6ad1a11a
23 changed files with 252 additions and 106 deletions

View File

@@ -489,6 +489,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
/// Create a new `Work` event for some block, where the result from computation (if any) is
/// sent to the other side of `result_tx`.
pub fn rpc_beacon_block(
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
@@ -496,6 +497,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
Self {
drop_during_sync: false,
work: Work::RpcBlock {
block_root,
block,
seen_timestamp,
process_type,
@@ -577,6 +579,7 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
},
},
ReadyWork::RpcBlock(QueuedRpcBlock {
block_root,
block,
seen_timestamp,
process_type,
@@ -584,6 +587,7 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
}) => Self {
drop_during_sync: false,
work: Work::RpcBlock {
block_root,
block,
seen_timestamp,
process_type,
@@ -705,6 +709,7 @@ pub enum Work<T: BeaconChainTypes> {
seen_timestamp: Duration,
},
RpcBlock {
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
@@ -1532,11 +1537,13 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
* Verification for beacon blocks received during syncing via RPC.
*/
Work::RpcBlock {
block_root,
block,
seen_timestamp,
process_type,
should_process,
} => task_spawner.spawn_async(worker.process_rpc_block(
block_root,
block,
seen_timestamp,
process_type,

View File

@@ -242,6 +242,7 @@ impl TestRig {
pub fn enqueue_rpc_block(&self) {
let event = WorkEvent::rpc_beacon_block(
self.next_block.canonical_root(),
self.next_block.clone(),
std::time::Duration::default(),
BlockProcessType::ParentLookup {
@@ -253,6 +254,7 @@ impl TestRig {
pub fn enqueue_single_lookup_rpc_block(&self) {
let event = WorkEvent::rpc_beacon_block(
self.next_block.canonical_root(),
self.next_block.clone(),
std::time::Duration::default(),
BlockProcessType::SingleBlock { id: 1 },

View File

@@ -109,6 +109,7 @@ pub struct QueuedGossipBlock<T: BeaconChainTypes> {
/// A block that arrived for processing when the same block was being imported over gossip.
/// It is queued for later import.
pub struct QueuedRpcBlock<T: EthSpec> {
pub block_root: Hash256,
pub block: Arc<SignedBeaconBlock<T>>,
pub process_type: BlockProcessType,
pub seen_timestamp: Duration,

View File

@@ -713,16 +713,28 @@ impl<T: BeaconChainTypes> Worker<T> {
block_delay,
);
let verification_result = self
.chain
.clone()
.verify_block_for_gossip(block.clone())
.await;
let block_root = if let Ok(verified_block) = &verification_result {
verified_block.block_root
} else {
block.canonical_root()
};
// Write the time the block was observed into delay cache.
self.chain.block_times_cache.write().set_time_observed(
block.canonical_root(),
block_root,
block.slot(),
seen_duration,
Some(peer_id.to_string()),
Some(peer_client.to_string()),
);
let verified_block = match self.chain.clone().verify_block_for_gossip(block).await {
let verified_block = match verification_result {
Ok(verified_block) => {
if block_delay >= self.chain.slot_clock.unagg_attestation_production_delay() {
metrics::inc_counter(&metrics::BEACON_BLOCK_GOSSIP_ARRIVED_LATE_TOTAL);
@@ -762,9 +774,9 @@ impl<T: BeaconChainTypes> Worker<T> {
debug!(
self.log,
"Unknown parent for gossip block";
"root" => ?block.canonical_root()
"root" => ?block_root
);
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root));
return None;
}
Err(e @ BlockError::BeaconChainError(_)) => {
@@ -918,10 +930,11 @@ impl<T: BeaconChainTypes> Worker<T> {
_seen_duration: Duration,
) {
let block: Arc<_> = verified_block.block.clone();
let block_root = verified_block.block_root;
match self
.chain
.process_block(verified_block, CountUnrealized::True)
.process_block(block_root, verified_block, CountUnrealized::True)
.await
{
Ok(block_root) => {
@@ -956,7 +969,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"Block with unknown parent attempted to be processed";
"peer_id" => %peer_id
);
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block));
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root));
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(
@@ -970,7 +983,7 @@ impl<T: BeaconChainTypes> Worker<T> {
self.log,
"Invalid gossip beacon block";
"outcome" => ?other,
"block root" => ?block.canonical_root(),
"block root" => ?block_root,
"block slot" => block.slot()
);
self.gossip_penalize_peer(

View File

@@ -38,8 +38,10 @@ struct ChainSegmentFailed {
impl<T: BeaconChainTypes> Worker<T> {
/// Attempt to process a block received from a direct RPC request.
#[allow(clippy::too_many_arguments)]
pub async fn process_rpc_block(
self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
@@ -56,17 +58,18 @@ impl<T: BeaconChainTypes> Worker<T> {
return;
}
// Check if the block is already being imported through another source
let handle = match duplicate_cache.check_and_insert(block.canonical_root()) {
let handle = match duplicate_cache.check_and_insert(block_root) {
Some(handle) => handle,
None => {
debug!(
self.log,
"Gossip block is being processed";
"action" => "sending rpc block to reprocessing queue",
"block_root" => %block.canonical_root(),
"block_root" => %block_root,
);
// Send message to work reprocess queue to retry the block
let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock {
block_root,
block: block.clone(),
process_type,
seen_timestamp,
@@ -74,13 +77,16 @@ impl<T: BeaconChainTypes> Worker<T> {
});
if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %block.canonical_root())
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %block_root)
};
return;
}
};
let slot = block.slot();
let result = self.chain.process_block(block, CountUnrealized::True).await;
let result = self
.chain
.process_block(block_root, block, CountUnrealized::True)
.await;
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);

View File

@@ -30,6 +30,8 @@ mod single_block_lookup;
#[cfg(test)]
mod tests;
pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
@@ -101,11 +103,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// called in order to find the block's parent.
pub fn search_parent(
&mut self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) {
let block_root = block.canonical_root();
let parent_root = block.parent_root();
// If this block or it's parent is part of a known failed chain, ignore it.
if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) {
@@ -125,7 +127,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return;
}
let parent_lookup = ParentLookup::new(block, peer_id);
let parent_lookup = ParentLookup::new(block_root, block, peer_id);
self.request_parent(parent_lookup, cx);
}
@@ -153,10 +155,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};
match request.get_mut().verify_block(block) {
Ok(Some(block)) => {
Ok(Some((block_root, block))) => {
// This is the correct block, send it for processing
if self
.send_block_for_processing(
block_root,
block,
seen_timestamp,
BlockProcessType::SingleBlock { id },
@@ -217,11 +220,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};
match parent_lookup.verify_block(block, &mut self.failed_chains) {
Ok(Some(block)) => {
Ok(Some((block_root, block))) => {
// Block is correct, send to the beacon processor.
let chain_hash = parent_lookup.chain_hash();
if self
.send_block_for_processing(
block_root,
block,
seen_timestamp,
BlockProcessType::ParentLookup { chain_hash },
@@ -420,7 +424,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
}
BlockError::ParentUnknown(block) => {
self.search_parent(block, peer_id, cx);
self.search_parent(root, block, peer_id, cx);
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
@@ -625,6 +629,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
fn send_block_for_processing(
&mut self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
duration: Duration,
process_type: BlockProcessType,
@@ -632,8 +637,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
) -> Result<(), ()> {
match cx.processor_channel_if_enabled() {
Some(beacon_processor_send) => {
trace!(self.log, "Sending block for processing"; "block" => %block.canonical_root(), "process" => ?process_type);
let event = WorkEvent::rpc_beacon_block(block, duration, process_type);
trace!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type);
let event = WorkEvent::rpc_beacon_block(block_root, block, duration, process_type);
if let Err(e) = beacon_processor_send.try_send(event) {
error!(
self.log,
@@ -646,7 +651,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
None => {
trace!(self.log, "Dropping block ready for processing. Beacon processor not available"; "block" => %block.canonical_root());
trace!(self.log, "Dropping block ready for processing. Beacon processor not available"; "block" => %block_root);
Err(())
}
}

View File

@@ -1,3 +1,4 @@
use super::RootBlockTuple;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId;
use std::sync::Arc;
@@ -58,11 +59,15 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.any(|d_block| d_block.as_ref() == block)
}
pub fn new(block: Arc<SignedBeaconBlock<T::EthSpec>>, peer_id: PeerId) -> Self {
pub fn new(
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
peer_id: PeerId,
) -> Self {
let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id);
Self {
chain_hash: block.canonical_root(),
chain_hash: block_root,
downloaded_blocks: vec![block],
current_parent_request,
current_parent_request_id: None,
@@ -130,12 +135,15 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
&mut self,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, VerifyError> {
let block = self.current_parent_request.verify_block(block)?;
) -> Result<Option<RootBlockTuple<T::EthSpec>>, VerifyError> {
let root_and_block = self.current_parent_request.verify_block(block)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should
// be dropped and the peer downscored.
if let Some(parent_root) = block.as_ref().map(|block| block.parent_root()) {
if let Some(parent_root) = root_and_block
.as_ref()
.map(|(_, block)| block.parent_root())
{
if failed_chains.contains(&parent_root) {
self.current_parent_request.register_failure_downloading();
self.current_parent_request_id = None;
@@ -143,7 +151,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
}
}
Ok(block)
Ok(root_and_block)
}
pub fn get_processing_peer(&self, chain_hash: Hash256) -> Option<PeerId> {

View File

@@ -1,6 +1,8 @@
use std::collections::HashSet;
use std::sync::Arc;
use super::RootBlockTuple;
use beacon_chain::get_block_root;
use lighthouse_network::{rpc::BlocksByRootRequest, PeerId};
use rand::seq::IteratorRandom;
use ssz_types::VariableList;
@@ -104,7 +106,7 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
pub fn verify_block<T: EthSpec>(
&mut self,
block: Option<Arc<SignedBeaconBlock<T>>>,
) -> Result<Option<Arc<SignedBeaconBlock<T>>>, VerifyError> {
) -> Result<Option<RootBlockTuple<T>>, VerifyError> {
match self.state {
State::AwaitingDownload => {
self.register_failure_downloading();
@@ -112,7 +114,10 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
}
State::Downloading { peer_id } => match block {
Some(block) => {
if block.canonical_root() != self.hash {
// Compute the block root using this specific function so that we can get timing
// metrics.
let block_root = get_block_root(&block);
if block_root != self.hash {
// return an error and drop the block
// NOTE: we take this is as a download failure to prevent counting the
// attempt as a chain failure, but simply a peer failure.
@@ -121,7 +126,7 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
} else {
// Return the block for processing.
self.state = State::Processing { peer_id };
Ok(Some(block))
Ok(Some((block_root, block)))
}
}
None => {

View File

@@ -272,7 +272,7 @@ fn test_parent_lookup_happy_path() {
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
let id = rig.expect_parent_request();
// Peer sends the right block, it should be sent for processing. Peer should not be penalized.
@@ -300,7 +300,7 @@ fn test_parent_lookup_wrong_response() {
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
let id1 = rig.expect_parent_request();
// Peer sends the wrong block, peer should be penalized and the block re-requested.
@@ -337,7 +337,7 @@ fn test_parent_lookup_empty_response() {
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
let id1 = rig.expect_parent_request();
// Peer sends an empty response, peer should be penalized and the block re-requested.
@@ -369,7 +369,7 @@ fn test_parent_lookup_rpc_failure() {
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
let id1 = rig.expect_parent_request();
// The request fails. It should be tried again.
@@ -396,10 +396,11 @@ fn test_parent_lookup_too_many_attempts() {
let parent = rig.rand_block();
let block = rig.block_with_parent(parent.canonical_root());
let chain_hash = block.canonical_root();
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE {
let id = rig.expect_parent_request();
match i % 2 {
@@ -435,7 +436,7 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() {
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
bl.search_parent(block_hash, Arc::new(block), peer_id, &mut cx);
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE {
assert!(!bl.failed_chains.contains(&block_hash));
let id = rig.expect_parent_request();
@@ -469,7 +470,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() {
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
bl.search_parent(block_hash, Arc::new(block), peer_id, &mut cx);
// Fail downloading the block
for _ in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) {
@@ -510,7 +511,7 @@ fn test_parent_lookup_too_deep() {
let peer_id = PeerId::random();
let trigger_block = blocks.pop().unwrap();
let chain_hash = trigger_block.canonical_root();
bl.search_parent(Arc::new(trigger_block), peer_id, &mut cx);
bl.search_parent(chain_hash, Arc::new(trigger_block), peer_id, &mut cx);
for block in blocks.into_iter().rev() {
let id = rig.expect_parent_request();
@@ -537,7 +538,12 @@ fn test_parent_lookup_disconnection() {
let (mut bl, mut cx, mut rig) = TestRig::test_setup(None);
let peer_id = PeerId::random();
let trigger_block = rig.rand_block();
bl.search_parent(Arc::new(trigger_block), peer_id, &mut cx);
bl.search_parent(
trigger_block.canonical_root(),
Arc::new(trigger_block),
peer_id,
&mut cx,
);
bl.peer_disconnected(&peer_id, &mut cx);
assert!(bl.parent_queue.is_empty());
}
@@ -581,7 +587,7 @@ fn test_parent_lookup_ignored_response() {
let peer_id = PeerId::random();
// Trigger the request
bl.search_parent(Arc::new(block), peer_id, &mut cx);
bl.search_parent(chain_hash, Arc::new(block), peer_id, &mut cx);
let id = rig.expect_parent_request();
// Peer sends the right block, it should be sent for processing. Peer should not be penalized.

View File

@@ -94,7 +94,7 @@ pub enum SyncMessage<T: EthSpec> {
},
/// A block with an unknown parent has been received.
UnknownBlock(PeerId, Arc<SignedBeaconBlock<T>>),
UnknownBlock(PeerId, Arc<SignedBeaconBlock<T>>, Hash256),
/// A peer has sent an object that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash.
@@ -503,7 +503,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => {
self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp);
}
SyncMessage::UnknownBlock(peer_id, block) => {
SyncMessage::UnknownBlock(peer_id, block, block_root) => {
// If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
if !self.network_globals.sync_state.read().is_synced() {
let head_slot = self.chain.canonical_head.cached_head().head_slot();
@@ -523,7 +523,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
&& self.network.is_execution_engine_online()
{
self.block_lookups
.search_parent(block, peer_id, &mut self.network);
.search_parent(block_root, block, peer_id, &mut self.network);
}
}
SyncMessage::UnknownBlockHash(peer_id, block_hash) => {