mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-14 10:22:38 +00:00
Couple blocks and blobs in gossip (#3670)
* Revert "Add more gossip verification conditions" This reverts commit1430b561c3. * Revert "Add todos" This reverts commit91efb9d4c7. * Revert "Reprocess blob sidecar messages" This reverts commit21bf3d37cd. * Add the coupled topic * Decode SignedBeaconBlockAndBlobsSidecar correctly * Process Block and Blobs in beacon processor * Remove extra blob publishing logic from vc * Remove blob signing in vc * Ugly hack to compile
This commit is contained in:
@@ -46,6 +46,7 @@ use derivative::Derivative;
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use futures::task::Poll;
|
||||
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
|
||||
use lighthouse_network::SignedBeaconBlockAndBlobsSidecar;
|
||||
use lighthouse_network::{
|
||||
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
|
||||
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
|
||||
@@ -61,7 +62,6 @@ use std::time::Duration;
|
||||
use std::{cmp, collections::HashSet};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::mpsc;
|
||||
use types::signed_blobs_sidecar::SignedBlobsSidecar;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
|
||||
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
|
||||
@@ -80,8 +80,6 @@ mod worker;
|
||||
use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock;
|
||||
pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage};
|
||||
|
||||
use self::work_reprocessing_queue::QueuedBlobsSidecar;
|
||||
|
||||
/// The maximum size of the channel for work events to the `BeaconProcessor`.
|
||||
///
|
||||
/// Setting this too low will cause consensus messages to be dropped.
|
||||
@@ -117,9 +115,7 @@ const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024;
|
||||
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
|
||||
|
||||
//FIXME(sean) verify
|
||||
const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024;
|
||||
//FIXME(sean) verify
|
||||
const MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN: usize = 1_024;
|
||||
const MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN: usize = 1_024;
|
||||
|
||||
/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
|
||||
/// within acceptable clock disparity) that will be queued before we start dropping them.
|
||||
@@ -195,7 +191,7 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch";
|
||||
pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate";
|
||||
pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch";
|
||||
pub const GOSSIP_BLOCK: &str = "gossip_block";
|
||||
pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar";
|
||||
pub const GOSSIP_BLOCK_AND_BLOBS_SIDECAR: &str = "gossip_block_and_blobs_sidecar";
|
||||
pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
|
||||
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
|
||||
pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing";
|
||||
@@ -210,7 +206,6 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
|
||||
pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
|
||||
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
|
||||
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
|
||||
pub const UNKNOWN_BLOBS_SIDECAR: &str = "unknown_blobs_sidecar";
|
||||
|
||||
/// A simple first-in-first-out queue with a maximum length.
|
||||
struct FifoQueue<T> {
|
||||
@@ -415,19 +410,20 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for some blobs sidecar.
|
||||
pub fn gossip_blobs_sidecar(
|
||||
pub fn gossip_block_and_blobs_sidecar(
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
_peer_client: Client,
|
||||
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
|
||||
peer_client: Client,
|
||||
block_and_blobs: Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
drop_during_sync: false,
|
||||
work: Work::GossipBlobsSidecar {
|
||||
work: Work::GossipBlockAndBlobsSidecar {
|
||||
message_id,
|
||||
peer_id,
|
||||
blobs,
|
||||
peer_client,
|
||||
block_and_blobs,
|
||||
seen_timestamp,
|
||||
},
|
||||
}
|
||||
@@ -674,20 +670,6 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
|
||||
seen_timestamp,
|
||||
},
|
||||
},
|
||||
ReadyWork::BlobsSidecar(QueuedBlobsSidecar {
|
||||
peer_id,
|
||||
message_id,
|
||||
blobs_sidecar,
|
||||
seen_timestamp,
|
||||
}) => Self {
|
||||
drop_during_sync: true,
|
||||
work: Work::UnknownBlobsSidecar {
|
||||
message_id,
|
||||
peer_id,
|
||||
blobs: blobs_sidecar,
|
||||
seen_timestamp,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -737,16 +719,11 @@ pub enum Work<T: BeaconChainTypes> {
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
GossipBlobsSidecar {
|
||||
GossipBlockAndBlobsSidecar {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
UnknownBlobsSidecar {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
|
||||
peer_client: Client,
|
||||
block_and_blobs: Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
DelayedImportBlock {
|
||||
@@ -823,7 +800,7 @@ impl<T: BeaconChainTypes> Work<T> {
|
||||
Work::GossipAggregate { .. } => GOSSIP_AGGREGATE,
|
||||
Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH,
|
||||
Work::GossipBlock { .. } => GOSSIP_BLOCK,
|
||||
Work::GossipBlobsSidecar { .. } => GOSSIP_BLOBS_SIDECAR,
|
||||
Work::GossipBlockAndBlobsSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR,
|
||||
Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK,
|
||||
Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT,
|
||||
Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING,
|
||||
@@ -838,7 +815,6 @@ impl<T: BeaconChainTypes> Work<T> {
|
||||
Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST,
|
||||
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
|
||||
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
|
||||
Work::UnknownBlobsSidecar { .. } => UNKNOWN_BLOBS_SIDECAR,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -955,7 +931,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
|
||||
let mut unknown_block_attestation_queue =
|
||||
LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
|
||||
let mut unknown_blobs_sidecar_queue = LifoQueue::new(MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN);
|
||||
|
||||
let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN);
|
||||
let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN);
|
||||
@@ -976,7 +951,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
||||
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
||||
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
|
||||
let mut gossip_blobs_sidecar_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN);
|
||||
let mut gossip_block_and_blobs_sidecar_queue =
|
||||
FifoQueue::new(MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN);
|
||||
let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN);
|
||||
|
||||
let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
|
||||
@@ -1086,7 +1062,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
} else if let Some(item) = gossip_block_queue.pop() {
|
||||
self.spawn_worker(item, toolbox);
|
||||
//FIXME(sean)
|
||||
} else if let Some(item) = gossip_blobs_sidecar_queue.pop() {
|
||||
} else if let Some(item) = gossip_block_and_blobs_sidecar_queue.pop() {
|
||||
self.spawn_worker(item, toolbox);
|
||||
// Check the aggregates, *then* the unaggregates since we assume that
|
||||
// aggregates are more valuable to local validators and effectively give us
|
||||
@@ -1292,8 +1268,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
Work::GossipBlock { .. } => {
|
||||
gossip_block_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::GossipBlobsSidecar { .. } => {
|
||||
gossip_blobs_sidecar_queue.push(work, work_id, &self.log)
|
||||
Work::GossipBlockAndBlobsSidecar { .. } => {
|
||||
gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::DelayedImportBlock { .. } => {
|
||||
delayed_block_queue.push(work, work_id, &self.log)
|
||||
@@ -1337,9 +1313,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
Work::UnknownBlockAggregate { .. } => {
|
||||
unknown_block_aggregate_queue.push(work)
|
||||
}
|
||||
Work::UnknownBlobsSidecar { .. } => {
|
||||
unknown_blobs_sidecar_queue.push(work)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1556,19 +1529,22 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
/*
|
||||
* Verification for blobs sidecars received on gossip.
|
||||
*/
|
||||
Work::GossipBlobsSidecar {
|
||||
Work::GossipBlockAndBlobsSidecar {
|
||||
message_id,
|
||||
peer_id,
|
||||
blobs,
|
||||
peer_client,
|
||||
block_and_blobs,
|
||||
seen_timestamp,
|
||||
} => task_spawner.spawn_async(async move {
|
||||
worker.process_gossip_blob(
|
||||
message_id,
|
||||
peer_id,
|
||||
blobs,
|
||||
Some(work_reprocessing_tx),
|
||||
seen_timestamp,
|
||||
)
|
||||
worker
|
||||
.process_gossip_block_and_blobs_sidecar(
|
||||
message_id,
|
||||
peer_id,
|
||||
peer_client,
|
||||
block_and_blobs,
|
||||
seen_timestamp,
|
||||
)
|
||||
.await
|
||||
}),
|
||||
/*
|
||||
* Import for blocks that we received earlier than their intended slot.
|
||||
@@ -1755,14 +1731,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
seen_timestamp,
|
||||
)
|
||||
}),
|
||||
Work::UnknownBlobsSidecar {
|
||||
message_id,
|
||||
peer_id,
|
||||
blobs,
|
||||
seen_timestamp,
|
||||
} => task_spawner.spawn_blocking(move || {
|
||||
worker.process_gossip_blob(message_id, peer_id, blobs, None, seen_timestamp)
|
||||
}),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,10 +30,7 @@ use task_executor::TaskExecutor;
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use tokio::time::error::Error as TimeError;
|
||||
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
|
||||
use types::{
|
||||
Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobsSidecar,
|
||||
SubnetId,
|
||||
};
|
||||
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
|
||||
|
||||
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
|
||||
const GOSSIP_BLOCKS: &str = "gossip_blocks";
|
||||
@@ -47,10 +44,6 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5);
|
||||
/// For how long to queue aggregated and unaggregated attestations for re-processing.
|
||||
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
|
||||
|
||||
/// For how long to queue blob sidecars for re-processing.
|
||||
/// TODO: rethink duration
|
||||
pub const QUEUED_BLOBS_SIDECARS_DELAY: Duration = Duration::from_secs(6);
|
||||
|
||||
/// For how long to queue rpc blocks before sending them back for reprocessing.
|
||||
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
|
||||
|
||||
@@ -62,10 +55,6 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16;
|
||||
/// How many attestations we keep before new ones get dropped.
|
||||
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
|
||||
|
||||
/// TODO: fix number
|
||||
/// How many blobs we keep before new ones get dropped.
|
||||
const MAXIMUM_QUEUED_BLOB_SIDECARS: usize = 16_384;
|
||||
|
||||
/// Messages that the scheduler can receive.
|
||||
pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
|
||||
/// A block that has been received early and we should queue for later processing.
|
||||
@@ -80,8 +69,6 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
|
||||
UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>),
|
||||
/// An aggregated attestation that references an unknown block.
|
||||
UnknownBlockAggregate(QueuedAggregate<T::EthSpec>),
|
||||
/// A blob sidecar that references an unknown block.
|
||||
UnknownBlobSidecar(QueuedBlobsSidecar<T::EthSpec>),
|
||||
}
|
||||
|
||||
/// Events sent by the scheduler once they are ready for re-processing.
|
||||
@@ -90,7 +77,6 @@ pub enum ReadyWork<T: BeaconChainTypes> {
|
||||
RpcBlock(QueuedRpcBlock<T::EthSpec>),
|
||||
Unaggregate(QueuedUnaggregate<T::EthSpec>),
|
||||
Aggregate(QueuedAggregate<T::EthSpec>),
|
||||
BlobsSidecar(QueuedBlobsSidecar<T::EthSpec>),
|
||||
}
|
||||
|
||||
/// An Attestation for which the corresponding block was not seen while processing, queued for
|
||||
@@ -132,15 +118,6 @@ pub struct QueuedRpcBlock<T: EthSpec> {
|
||||
pub should_process: bool,
|
||||
}
|
||||
|
||||
/// A blob sidecar for which the corresponding block was not seen while processing, queued for
|
||||
/// later.
|
||||
pub struct QueuedBlobsSidecar<T: EthSpec> {
|
||||
pub peer_id: PeerId,
|
||||
pub message_id: MessageId,
|
||||
pub blobs_sidecar: Arc<SignedBlobsSidecar<T>>,
|
||||
pub seen_timestamp: Duration,
|
||||
}
|
||||
|
||||
/// Unifies the different messages processed by the block delay queue.
|
||||
enum InboundEvent<T: BeaconChainTypes> {
|
||||
/// A gossip block that was queued for later processing and is ready for import.
|
||||
@@ -150,8 +127,6 @@ enum InboundEvent<T: BeaconChainTypes> {
|
||||
ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>),
|
||||
/// An aggregated or unaggregated attestation is ready for re-processing.
|
||||
ReadyAttestation(QueuedAttestationId),
|
||||
/// A blob sidecar is ready for re-processing.
|
||||
ReadyBlobsSidecar(QueuedBlobsSidecarId),
|
||||
/// A `DelayQueue` returned an error.
|
||||
DelayQueueError(TimeError, &'static str),
|
||||
/// A message sent to the `ReprocessQueue`
|
||||
@@ -172,7 +147,6 @@ struct ReprocessQueue<T: BeaconChainTypes> {
|
||||
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>,
|
||||
/// Queue to manage scheduled attestations.
|
||||
attestations_delay_queue: DelayQueue<QueuedAttestationId>,
|
||||
blobs_sidecar_delay_queue: DelayQueue<QueuedBlobsSidecarId>,
|
||||
|
||||
/* Queued items */
|
||||
/// Queued blocks.
|
||||
@@ -181,19 +155,15 @@ struct ReprocessQueue<T: BeaconChainTypes> {
|
||||
queued_aggregates: FnvHashMap<usize, (QueuedAggregate<T::EthSpec>, DelayKey)>,
|
||||
/// Queued attestations.
|
||||
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate<T::EthSpec>, DelayKey)>,
|
||||
queued_blob_sidecars: FnvHashMap<usize, (QueuedBlobsSidecar<T::EthSpec>, DelayKey)>,
|
||||
/// Attestations (aggregated and unaggregated) per root.
|
||||
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
|
||||
awaiting_blobs_sidecars_per_root: HashMap<Hash256, Vec<QueuedBlobsSidecarId>>,
|
||||
|
||||
/* Aux */
|
||||
/// Next attestation id, used for both aggregated and unaggregated attestations
|
||||
next_attestation: usize,
|
||||
next_sidecar: usize,
|
||||
early_block_debounce: TimeLatch,
|
||||
rpc_block_debounce: TimeLatch,
|
||||
attestation_delay_debounce: TimeLatch,
|
||||
blobs_sidecar_debounce: TimeLatch,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -202,9 +172,6 @@ enum QueuedAttestationId {
|
||||
Unaggregate(usize),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
struct QueuedBlobsSidecarId(usize);
|
||||
|
||||
impl<T: EthSpec> QueuedAggregate<T> {
|
||||
pub fn beacon_block_root(&self) -> &Hash256 {
|
||||
&self.attestation.message.aggregate.data.beacon_block_root
|
||||
@@ -268,21 +235,6 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
match self.blobs_sidecar_delay_queue.poll_expired(cx) {
|
||||
Poll::Ready(Some(Ok(id))) => {
|
||||
return Poll::Ready(Some(InboundEvent::ReadyBlobsSidecar(id.into_inner())));
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
return Poll::Ready(Some(InboundEvent::DelayQueueError(
|
||||
e,
|
||||
"blobs_sidecar_queue",
|
||||
)));
|
||||
}
|
||||
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
|
||||
// will continue to get this result until something else is added into the queue.
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
// Last empty the messages channel.
|
||||
match self.work_reprocessing_rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))),
|
||||
@@ -312,19 +264,14 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
|
||||
gossip_block_delay_queue: DelayQueue::new(),
|
||||
rpc_block_delay_queue: DelayQueue::new(),
|
||||
attestations_delay_queue: DelayQueue::new(),
|
||||
blobs_sidecar_delay_queue: DelayQueue::new(),
|
||||
queued_gossip_block_roots: HashSet::new(),
|
||||
queued_aggregates: FnvHashMap::default(),
|
||||
queued_unaggregates: FnvHashMap::default(),
|
||||
queued_blob_sidecars: FnvHashMap::default(),
|
||||
awaiting_attestations_per_root: HashMap::new(),
|
||||
awaiting_blobs_sidecars_per_root: HashMap::new(),
|
||||
next_attestation: 0,
|
||||
next_sidecar: 0,
|
||||
early_block_debounce: TimeLatch::default(),
|
||||
rpc_block_debounce: TimeLatch::default(),
|
||||
attestation_delay_debounce: TimeLatch::default(),
|
||||
blobs_sidecar_debounce: TimeLatch::default(),
|
||||
};
|
||||
|
||||
executor.spawn(
|
||||
@@ -526,39 +473,6 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
|
||||
self.next_attestation += 1;
|
||||
}
|
||||
InboundEvent::Msg(UnknownBlobSidecar(queued_blob_sidecar)) => {
|
||||
if self.blobs_sidecar_delay_queue.len() >= MAXIMUM_QUEUED_BLOB_SIDECARS {
|
||||
if self.blobs_sidecar_debounce.elapsed() {
|
||||
error!(
|
||||
log,
|
||||
"Blobs sidecar queue is full";
|
||||
"queue_size" => MAXIMUM_QUEUED_BLOB_SIDECARS,
|
||||
"msg" => "check system clock"
|
||||
);
|
||||
}
|
||||
// Drop the attestation.
|
||||
return;
|
||||
}
|
||||
|
||||
let id = QueuedBlobsSidecarId(self.next_sidecar);
|
||||
|
||||
// Register the delay.
|
||||
let delay_key = self
|
||||
.blobs_sidecar_delay_queue
|
||||
.insert(id, QUEUED_BLOBS_SIDECARS_DELAY);
|
||||
|
||||
// Register this sidecar for the corresponding root.
|
||||
self.awaiting_blobs_sidecars_per_root
|
||||
.entry(queued_blob_sidecar.blobs_sidecar.message.beacon_block_root)
|
||||
.or_default()
|
||||
.push(id);
|
||||
|
||||
// Store the blob sidecar and its info.
|
||||
self.queued_blob_sidecars
|
||||
.insert(self.next_sidecar, (queued_blob_sidecar, delay_key));
|
||||
|
||||
self.next_sidecar += 1;
|
||||
}
|
||||
InboundEvent::Msg(BlockImported(root)) => {
|
||||
// Unqueue the attestations we have for this root, if any.
|
||||
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) {
|
||||
@@ -603,43 +517,6 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Unqueue the blob sidecars we have for this root, if any.
|
||||
// TODO: merge the 2 data structures.
|
||||
if let Some(queued_ids) = self.awaiting_blobs_sidecars_per_root.remove(&root) {
|
||||
for id in queued_ids {
|
||||
// metrics::inc_counter(
|
||||
// &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS,
|
||||
// );
|
||||
|
||||
if let Some((work, delay_key)) = self
|
||||
.queued_blob_sidecars
|
||||
.remove(&id.0)
|
||||
.map(|(blobs_sidecar, delay_key)| {
|
||||
(ReadyWork::BlobsSidecar(blobs_sidecar), delay_key)
|
||||
})
|
||||
{
|
||||
// Remove the delay.
|
||||
self.blobs_sidecar_delay_queue.remove(&delay_key);
|
||||
|
||||
// Send the work.
|
||||
if self.ready_work_tx.try_send(work).is_err() {
|
||||
error!(
|
||||
log,
|
||||
"Failed to send scheduled blob sidecar";
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// There is a mismatch between the blob sidecar ids registered for this
|
||||
// root and the queued blob sidecars. This should never happen.
|
||||
error!(
|
||||
log,
|
||||
"Unknown queued blob sidecar for block root";
|
||||
"block_root" => ?root,
|
||||
"id" => ?id,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// A block that was queued for later processing is now ready to be processed.
|
||||
InboundEvent::ReadyGossipBlock(ready_block) => {
|
||||
@@ -714,40 +591,6 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
InboundEvent::ReadyBlobsSidecar(queued_blobs_sidecar_id) => {
|
||||
// metrics::inc_counter(
|
||||
// &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
|
||||
// );
|
||||
|
||||
if let Some((root, work)) = self
|
||||
.queued_blob_sidecars
|
||||
.remove(&queued_blobs_sidecar_id.0)
|
||||
.map(|(blobs_sidecar, _delay_key)| {
|
||||
(
|
||||
blobs_sidecar.blobs_sidecar.message.beacon_block_root,
|
||||
ReadyWork::BlobsSidecar(blobs_sidecar),
|
||||
)
|
||||
})
|
||||
{
|
||||
if self.ready_work_tx.try_send(work).is_err() {
|
||||
error!(
|
||||
log,
|
||||
"Failed to send scheduled attestation";
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(queued_blob_sidecars) =
|
||||
self.awaiting_blobs_sidecars_per_root.get_mut(&root)
|
||||
{
|
||||
if let Some(index) = queued_blob_sidecars
|
||||
.iter()
|
||||
.position(|&id| id == queued_blobs_sidecar_id)
|
||||
{
|
||||
queued_blob_sidecars.swap_remove(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics::set_gauge_vec(
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::beacon_processor::work_reprocessing_queue::QueuedBlobsSidecar;
|
||||
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
|
||||
|
||||
use beacon_chain::store::Error;
|
||||
@@ -11,7 +10,10 @@ use beacon_chain::{
|
||||
BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError,
|
||||
GossipVerifiedBlock,
|
||||
};
|
||||
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
|
||||
use lighthouse_network::{
|
||||
Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource,
|
||||
SignedBeaconBlockAndBlobsSidecar,
|
||||
};
|
||||
use slog::{crit, debug, error, info, trace, warn};
|
||||
use slot_clock::SlotClock;
|
||||
use ssz::Encode;
|
||||
@@ -19,11 +21,10 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use store::hot_cold_store::HotColdDBError;
|
||||
use tokio::sync::mpsc;
|
||||
use types::signed_blobs_sidecar::SignedBlobsSidecar;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing,
|
||||
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
|
||||
Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
|
||||
Attestation, AttesterSlashing, BlobsSidecar, EthSpec, Hash256, IndexedAttestation,
|
||||
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof,
|
||||
SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
|
||||
};
|
||||
|
||||
use super::{
|
||||
@@ -697,30 +698,15 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn process_gossip_blob(
|
||||
pub async fn process_gossip_block_and_blobs_sidecar(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
blob: Arc<SignedBlobsSidecar<T::EthSpec>>,
|
||||
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
|
||||
peer_client: Client,
|
||||
block_and_blob: Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
) {
|
||||
match self.chain.verify_blobs_sidecar_for_gossip(&blob) {
|
||||
//FIXME(sean)
|
||||
Ok(verified_sidecar) => {
|
||||
// Register with validator monitor
|
||||
// Propagate
|
||||
// Apply to fork choice
|
||||
}
|
||||
Err(error) => self.handle_blobs_verification_failure(
|
||||
peer_id,
|
||||
message_id,
|
||||
reprocess_tx,
|
||||
error,
|
||||
blob,
|
||||
seen_timestamp,
|
||||
),
|
||||
};
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
/// Process the beacon block received from the gossip network and
|
||||
@@ -2235,82 +2221,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
message_id: MessageId,
|
||||
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage<T>>>,
|
||||
error: BlobError,
|
||||
blobs_sidecar: Arc<SignedBlobsSidecar<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
) {
|
||||
// TODO: metrics
|
||||
match &error {
|
||||
BlobError::FutureSlot { .. } => {
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
}
|
||||
BlobError::PastSlot { .. } => {
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
}
|
||||
BlobError::BeaconChainError(_e) => {
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
}
|
||||
BlobError::BlobOutOfRange { blob_index: _ } => {
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
|
||||
}
|
||||
BlobError::InvalidKZGCommitment => {
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
|
||||
}
|
||||
BlobError::ProposalSignatureInvalid => {
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
|
||||
}
|
||||
BlobError::RepeatSidecar {
|
||||
proposer: _,
|
||||
slot: _,
|
||||
} => {
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
}
|
||||
BlobError::UnknownHeadBlock { beacon_block_root } => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Blob sidecar for unknown block";
|
||||
"peer_id" => %peer_id,
|
||||
"block" => ?beacon_block_root
|
||||
);
|
||||
if let Some(sender) = reprocess_tx {
|
||||
// We don't know the block, get the sync manager to handle the block lookup, and
|
||||
// send the attestation to be scheduled for re-processing.
|
||||
self.sync_tx
|
||||
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
|
||||
.unwrap_or_else(|_| {
|
||||
warn!(
|
||||
self.log,
|
||||
"Failed to send to sync service";
|
||||
"msg" => "UnknownBlockHash"
|
||||
)
|
||||
});
|
||||
let msg = ReprocessQueueMessage::UnknownBlobSidecar(QueuedBlobsSidecar {
|
||||
peer_id,
|
||||
message_id,
|
||||
blobs_sidecar,
|
||||
seen_timestamp,
|
||||
});
|
||||
|
||||
if sender.try_send(msg).is_err() {
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to send blob sidecar for re-processing";
|
||||
)
|
||||
}
|
||||
} else {
|
||||
// We shouldn't make any further attempts to process this attestation.
|
||||
//
|
||||
// Don't downscore the peer since it's not clear if we requested this head
|
||||
// block from them or not.
|
||||
self.propagate_validation_result(
|
||||
message_id,
|
||||
peer_id,
|
||||
MessageAcceptance::Ignore,
|
||||
);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
&BlobError::UnknownValidator(_) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,12 +236,12 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
block,
|
||||
);
|
||||
}
|
||||
PubsubMessage::BlobsSidecars(blobs) => {
|
||||
self.processor.on_blobs_gossip(
|
||||
PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs) => {
|
||||
self.processor.on_block_and_blobs_sidecar_gossip(
|
||||
id,
|
||||
peer_id,
|
||||
self.network_globals.client(&peer_id),
|
||||
blobs,
|
||||
block_and_blobs,
|
||||
);
|
||||
}
|
||||
PubsubMessage::VoluntaryExit(exit) => {
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::sync::manager::RequestId as SyncId;
|
||||
use crate::sync::SyncMessage;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
|
||||
use lighthouse_network::rpc::*;
|
||||
use lighthouse_network::{rpc::*, SignedBeaconBlockAndBlobsSidecar};
|
||||
use lighthouse_network::{
|
||||
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response,
|
||||
};
|
||||
@@ -17,7 +17,6 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use store::SyncCommitteeMessage;
|
||||
use tokio::sync::mpsc;
|
||||
use types::signed_blobs_sidecar::SignedBlobsSidecar;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ProposerSlashing,
|
||||
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
|
||||
@@ -295,18 +294,18 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
))
|
||||
}
|
||||
|
||||
pub fn on_blobs_gossip(
|
||||
pub fn on_block_and_blobs_sidecar_gossip(
|
||||
&mut self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
peer_client: Client,
|
||||
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
|
||||
block_and_blobs: Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
|
||||
) {
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_blobs_sidecar(
|
||||
self.send_beacon_processor_work(BeaconWorkEvent::gossip_block_and_blobs_sidecar(
|
||||
message_id,
|
||||
peer_id,
|
||||
peer_client,
|
||||
blobs,
|
||||
block_and_blobs,
|
||||
timestamp_now(),
|
||||
))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user