Rework block processing (#4092)

* introduce availability pending block

* add intoavailableblock trait

* small fixes

* add 'gossip blob cache' and start to clean up processing and transition types

* shard memory blob cache

* Initial commit

* Fix after rebase

* Add gossip verification conditions

* cache cleanup

* general chaos

* extended chaos

* cargo fmt

* more progress

* more progress

* tons of changes, just tryna compile

* everything, everywhere, all at once

* Reprocess an ExecutedBlock on unavailable blobs

* Add sus gossip verification for blobs

* Merge stuff

* Remove reprocessing cache stuff

* lint

* Add a wrapper to allow construction of only valid `AvailableBlock`s

* rename blob arc list to blob list

* merge cleanuo

* Revert "merge cleanuo"

This reverts commit 5e98326878.

* Revert "Revert "merge cleanuo""

This reverts commit 3a4009443a.

* fix rpc methods

* move beacon block and blob to eth2/types

* rename gossip blob cache to data availability checker

* lots of changes

* fix some compilation issues

* fix compilation issues

* fix compilation issues

* fix compilation issues

* fix compilation issues

* fix compilation issues

* cargo fmt

* use a common data structure for block import types

* fix availability check on proposal import

* refactor the blob cache and split the block wrapper into two types

* add type conversion for signed block and block wrapper

* fix beacon chain tests and do some renaming, add some comments

* Partial processing (#4)

* move beacon block and blob to eth2/types

* rename gossip blob cache to data availability checker

* lots of changes

* fix some compilation issues

* fix compilation issues

* fix compilation issues

* fix compilation issues

* fix compilation issues

* fix compilation issues

* cargo fmt

* use a common data structure for block import types

* fix availability check on proposal import

* refactor the blob cache and split the block wrapper into two types

* add type conversion for signed block and block wrapper

* fix beacon chain tests and do some renaming, add some comments

* cargo update (#6)

---------

Co-authored-by: realbigsean <sean@sigmaprime.io>
Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
Pawan Dhananjay
2023-03-25 03:00:41 +05:30
committed by GitHub
parent 25a2d8f078
commit b276af98b7
46 changed files with 2167 additions and 1487 deletions

View File

@@ -41,7 +41,7 @@ num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" }
if-addrs = "0.6.4"
strum = "0.24.0"
tokio-util = { version = "0.6.3", features = ["time"] }
tokio-util = { version = "0.7.7", features = ["time"] }
derivative = "2.2.0"
delay_map = "0.3.0"
ethereum-types = { version = "0.14.1", optional = true }

View File

@@ -449,7 +449,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
peer_id: PeerId,
peer_client: Client,
blob_index: u64,
signed_blob: Arc<SignedBlobSidecar<T::EthSpec>>,
signed_blob: SignedBlobSidecar<T::EthSpec>,
seen_timestamp: Duration,
) -> Self {
Self {
@@ -459,7 +459,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
peer_id,
peer_client,
blob_index,
signed_blob,
signed_blob: Box::new(signed_blob),
seen_timestamp,
},
}
@@ -729,7 +729,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
fn from(ready_work: ReadyWork<T>) -> Self {
match ready_work {
ReadyWork::Block(QueuedGossipBlock {
ReadyWork::GossipBlock(QueuedGossipBlock {
peer_id,
block,
seen_timestamp,
@@ -864,7 +864,7 @@ pub enum Work<T: BeaconChainTypes> {
peer_id: PeerId,
peer_client: Client,
blob_index: u64,
signed_blob: Arc<SignedBlobSidecar<T::EthSpec>>,
signed_blob: Box<SignedBlobSidecar<T::EthSpec>>,
seen_timestamp: Duration,
},
DelayedImportBlock {
@@ -1759,7 +1759,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_id,
peer_client,
blob_index,
signed_blob,
*signed_blob,
seen_timestamp,
)
.await

View File

@@ -13,14 +13,15 @@
use super::MAX_SCHEDULED_WORK_QUEUE_LEN;
use crate::metrics;
use crate::sync::manager::BlockProcessType;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use fnv::FnvHashMap;
use futures::task::Poll;
use futures::{Stream, StreamExt};
use lighthouse_network::{MessageId, PeerId};
use logging::TimeLatch;
use slog::{crit, debug, error, trace, warn, Logger};
use slog::{debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
@@ -28,7 +29,6 @@ use std::task::Context;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId,
@@ -87,7 +87,7 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
/// Events sent by the scheduler once they are ready for re-processing.
pub enum ReadyWork<T: BeaconChainTypes> {
Block(QueuedGossipBlock<T>),
GossipBlock(QueuedGossipBlock<T>),
RpcBlock(QueuedRpcBlock<T::EthSpec>),
Unaggregate(QueuedUnaggregate<T::EthSpec>),
Aggregate(QueuedAggregate<T::EthSpec>),
@@ -154,8 +154,6 @@ enum InboundEvent<T: BeaconChainTypes> {
ReadyAttestation(QueuedAttestationId),
/// A light client update that is ready for re-processing.
ReadyLightClientUpdate(QueuedLightClientUpdateId),
/// A `DelayQueue` returned an error.
DelayQueueError(TimeError, &'static str),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage<T>),
}
@@ -233,54 +231,42 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
// The sequential nature of blockchains means it is generally better to try and import all
// existing blocks before new ones.
match self.gossip_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyGossipBlock(
queued_block.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.rpc_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.attestations_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(attestation_id))) => {
Poll::Ready(Some(attestation_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyAttestation(
attestation_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.lc_updates_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(lc_id))) => {
Poll::Ready(Some(lc_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
lc_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
@@ -400,7 +386,7 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
if block_slot <= now
&& self
.ready_work_tx
.try_send(ReadyWork::Block(early_block))
.try_send(ReadyWork::GossipBlock(early_block))
.is_err()
{
error!(
@@ -694,7 +680,7 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
if self
.ready_work_tx
.try_send(ReadyWork::Block(ready_block))
.try_send(ReadyWork::GossipBlock(ready_block))
.is_err()
{
error!(
@@ -703,14 +689,7 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
);
}
}
InboundEvent::DelayQueueError(e, queue_name) => {
crit!(
log,
"Failed to poll queue";
"queue" => queue_name,
"e" => ?e
)
}
InboundEvent::ReadyAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,

View File

@@ -1,6 +1,6 @@
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, GossipVerifiedBlob};
use beacon_chain::store::Error;
use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation},
@@ -9,15 +9,14 @@ use beacon_chain::{
observed_operations::ObservationOutcome,
sync_committee_verification::{self, Error as SyncCommitteeError},
validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError,
GossipVerifiedBlock, NotifyExecutionLayer,
AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized,
ForkChoiceError, GossipVerifiedBlock, NotifyExecutionLayer,
};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use operation_pool::ReceivedPreCapella;
use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock;
use ssz::Encode;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc;
@@ -654,19 +653,57 @@ impl<T: BeaconChainTypes> Worker<T> {
self,
_message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
_peer_client: Client,
blob_index: u64,
signed_blob: Arc<SignedBlobSidecar<T::EthSpec>>,
signed_blob: SignedBlobSidecar<T::EthSpec>,
_seen_duration: Duration,
) {
// TODO: gossip verification
crit!(self.log, "UNIMPLEMENTED gossip blob verification";
"peer_id" => %peer_id,
"client" => %peer_client,
"blob_topic" => blob_index,
"blob_index" => signed_blob.message.index,
"blob_slot" => signed_blob.message.slot
);
match self
.chain
.verify_blob_sidecar_for_gossip(signed_blob, blob_index)
{
Ok(gossip_verified_blob) => {
self.process_gossip_verified_blob(peer_id, gossip_verified_blob, _seen_duration)
.await
}
Err(_) => {
// TODO(pawan): handle all blob errors for peer scoring
todo!()
}
}
}
pub async fn process_gossip_verified_blob(
self,
peer_id: PeerId,
verified_blob: GossipVerifiedBlob<T::EthSpec>,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
// TODO
match self
.chain
.process_blob(verified_blob, CountUnrealized::True)
.await
{
Ok(AvailabilityProcessingStatus::Imported(_hash)) => {
todo!()
// add to metrics
// logging
}
Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => self
.send_sync_message(SyncMessage::UnknownBlobHash {
peer_id,
pending_blobs,
}),
Ok(AvailabilityProcessingStatus::PendingBlock(block_hash)) => {
self.send_sync_message(SyncMessage::UnknownBlockHash(peer_id, block_hash));
}
Err(_err) => {
// handle errors
todo!()
}
}
}
/// Process the beacon block received from the gossip network and:
@@ -802,6 +839,9 @@ impl<T: BeaconChainTypes> Worker<T> {
verified_block
}
Err(BlockError::AvailabilityCheck(_err)) => {
todo!()
}
Err(BlockError::ParentUnknown(block)) => {
debug!(
self.log,
@@ -984,7 +1024,7 @@ impl<T: BeaconChainTypes> Worker<T> {
)
.await
{
Ok(block_root) => {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
if reprocess_tx
@@ -1011,6 +1051,24 @@ impl<T: BeaconChainTypes> Worker<T> {
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) => {
// This error variant doesn't make any sense in this context
crit!(
self.log,
"Internal error. Cannot get AvailabilityProcessingStatus::PendingBlock on processing block";
"block_root" => %block_root
);
}
Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => {
// make rpc request for blob
self.send_sync_message(SyncMessage::UnknownBlobHash {
peer_id,
pending_blobs,
});
}
Err(BlockError::AvailabilityCheck(_)) => {
todo!()
}
Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`

View File

@@ -1,5 +1,3 @@
use std::sync::Arc;
use crate::beacon_processor::{worker::FUTURE_SLOT_TOLERANCE, SendOnDrop};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
@@ -15,7 +13,6 @@ use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo
use slog::{debug, error, warn};
use slot_clock::SlotClock;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio_stream::StreamExt;
use types::blob_sidecar::BlobIdentifier;
@@ -840,7 +837,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let mut send_response = true;
for root in block_roots {
match self.chain.get_blobs(&root, data_availability_boundary) {
match self.chain.get_blobs(&root) {
Ok(Some(blob_sidecar_list)) => {
for blob_sidecar in blob_sidecar_list.iter() {
blobs_sent += 1;

View File

@@ -7,8 +7,9 @@ use crate::beacon_processor::DuplicateCache;
use crate::metrics;
use crate::sync::manager::{BlockProcessType, SyncMessage};
use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock};
use beacon_chain::CountUnrealized;
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized};
use beacon_chain::{
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
NotifyExecutionLayer,
@@ -86,28 +87,22 @@ impl<T: BeaconChainTypes> Worker<T> {
};
let slot = block.slot();
let parent_root = block.message().parent_root();
let available_block = block
.into_available_block(block_root, &self.chain)
.map_err(BlockError::BlobValidation);
let result = match available_block {
Ok(block) => {
self.chain
.process_block(
block_root,
block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
}
Err(e) => Err(e),
};
let result = self
.chain
.process_block(
block_root,
block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await;
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
// RPC block imported, regardless of process type
if let &Ok(hash) = &result {
//TODO(sean) handle pending availability variants
if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result {
info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash);
// Trigger processing for work referencing this block.

View File

@@ -280,7 +280,6 @@ impl<T: BeaconChainTypes> Router<T> {
PubsubMessage::BlobSidecar(data) => {
let (blob_index, signed_blob) = *data;
let peer_client = self.network_globals.client(&peer_id);
let signed_blob = Arc::new(signed_blob);
self.send_beacon_processor_work(BeaconWorkEvent::gossip_signed_blob_sidecar(
message_id,
peer_id,

View File

@@ -2,7 +2,8 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::Duration;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChainTypes, BlockError};
use fnv::FnvHashMap;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};

View File

@@ -1,5 +1,6 @@
use super::RootBlockTuple;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId;
use store::Hash256;

View File

@@ -1,5 +1,6 @@
use super::RootBlockTuple;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::get_block_root;
use lighthouse_network::{rpc::BlocksByRootRequest, PeerId};
use rand::seq::IteratorRandom;

View File

@@ -42,7 +42,8 @@ use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEven
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
@@ -56,6 +57,7 @@ use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
@@ -119,6 +121,13 @@ pub enum SyncMessage<T: EthSpec> {
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHash(PeerId, Hash256),
/// A peer has sent us a block that we haven't received all the blobs for. This triggers
/// the manager to attempt to find the pending blobs for the given block root.
UnknownBlobHash {
peer_id: PeerId,
pending_blobs: Vec<BlobIdentifier>,
},
/// A peer has disconnected.
Disconnect(PeerId),
@@ -598,6 +607,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.search_block(block_hash, peer_id, &mut self.network);
}
}
SyncMessage::UnknownBlobHash { .. } => {
unimplemented!()
}
SyncMessage::Disconnect(peer_id) => {
self.peer_disconnect(&peer_id);
}