mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 04:31:51 +00:00
Some gossip work
This commit is contained in:
@@ -45,6 +45,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock};
|
||||
use derivative::Derivative;
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use futures::task::Poll;
|
||||
use lighthouse_network::rpc::methods::TxBlobsByRangeRequest;
|
||||
use lighthouse_network::{
|
||||
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
|
||||
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
|
||||
@@ -60,9 +61,8 @@ use std::time::Duration;
|
||||
use std::{cmp, collections::HashSet};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use lighthouse_network::rpc::methods::TxBlobsByRangeRequest;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
|
||||
Attestation, AttesterSlashing, BlobWrapper, Hash256, ProposerSlashing, SignedAggregateAndProof,
|
||||
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
|
||||
SyncCommitteeMessage, SyncSubnetId,
|
||||
};
|
||||
@@ -113,6 +113,9 @@ const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024;
|
||||
/// before we start dropping them.
|
||||
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
|
||||
|
||||
//FIXME(sean) verify
|
||||
const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024;
|
||||
|
||||
/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
|
||||
/// within acceptable clock disparity) that will be queued before we start dropping them.
|
||||
const MAX_DELAYED_BLOCK_QUEUE_LEN: usize = 1_024;
|
||||
@@ -187,6 +190,7 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch";
|
||||
pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate";
|
||||
pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch";
|
||||
pub const GOSSIP_BLOCK: &str = "gossip_block";
|
||||
pub const GOSSIP_BLOB: &str = "gossip_blob";
|
||||
pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
|
||||
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
|
||||
pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing";
|
||||
@@ -404,6 +408,25 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn gossip_tx_blob_block(
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
peer_client: Client,
|
||||
blob: Box<BlobWrapper<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
drop_during_sync: false,
|
||||
work: Work::GossipBlob {
|
||||
message_id,
|
||||
peer_id,
|
||||
peer_client,
|
||||
blob,
|
||||
seen_timestamp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `Work` event for some sync committee signature.
|
||||
pub fn gossip_sync_signature(
|
||||
message_id: MessageId,
|
||||
@@ -694,6 +717,13 @@ pub enum Work<T: BeaconChainTypes> {
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
GossipBlob {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
peer_client: Client,
|
||||
blob: Box<BlobWrapper<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
DelayedImportBlock {
|
||||
peer_id: PeerId,
|
||||
block: Box<GossipVerifiedBlock<T>>,
|
||||
@@ -768,6 +798,7 @@ impl<T: BeaconChainTypes> Work<T> {
|
||||
Work::GossipAggregate { .. } => GOSSIP_AGGREGATE,
|
||||
Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH,
|
||||
Work::GossipBlock { .. } => GOSSIP_BLOCK,
|
||||
Work::GossipBlob { .. } => GOSSIP_BLOB,
|
||||
Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK,
|
||||
Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT,
|
||||
Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING,
|
||||
@@ -918,6 +949,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
||||
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
|
||||
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
|
||||
let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN);
|
||||
let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN);
|
||||
|
||||
let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
|
||||
@@ -1026,6 +1058,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
// required to verify some attestations.
|
||||
} else if let Some(item) = gossip_block_queue.pop() {
|
||||
self.spawn_worker(item, toolbox);
|
||||
//FIXME(sean)
|
||||
} else if let Some(item) = gossip_blob_queue.pop() {
|
||||
self.spawn_worker(item, toolbox);
|
||||
// Check the aggregates, *then* the unaggregates since we assume that
|
||||
// aggregates are more valuable to local validators and effectively give us
|
||||
// more information with less signature verification time.
|
||||
@@ -1232,6 +1267,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
Work::GossipBlock { .. } => {
|
||||
gossip_block_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::GossipBlob { .. } => {
|
||||
gossip_blob_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::DelayedImportBlock { .. } => {
|
||||
delayed_block_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
@@ -1302,6 +1340,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL,
|
||||
gossip_block_queue.len() as i64,
|
||||
);
|
||||
//FIXME(sean) blob metrics
|
||||
metrics::set_gauge(
|
||||
&metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL,
|
||||
rpc_block_queue.len() as i64,
|
||||
|
||||
@@ -18,9 +18,9 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use store::hot_cold_store::HotColdDBError;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, EthSpec, Hash256, IndexedAttestation, ProposerSlashing,
|
||||
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
|
||||
Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
|
||||
Attestation, AttesterSlashing, BlobWrapper, EthSpec, Hash256, IndexedAttestation,
|
||||
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof,
|
||||
SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
|
||||
};
|
||||
|
||||
use super::{
|
||||
@@ -692,6 +692,20 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn process_gossip_blob(
|
||||
self,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
peer_client: Client,
|
||||
blob: BlobWrapper<T::EthSpec>,
|
||||
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
|
||||
duplicate_cache: DuplicateCache,
|
||||
seen_duration: Duration,
|
||||
) {
|
||||
//FIXME(sean)
|
||||
}
|
||||
|
||||
/// Process the beacon block received from the gossip network and
|
||||
/// if it passes gossip propagation criteria, tell the network thread to forward it.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user