Merge branch 'blob-syncing' of https://github.com/realbigsean/lighthouse into blob-sync-kzg

This commit is contained in:
realbigsean
2022-11-24 07:46:04 -05:00
10 changed files with 706 additions and 280 deletions

View File

@@ -148,6 +148,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
/// be stored before we start dropping them.
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
/// The maximum number of queued `Vec<[`SignedBeaconBlockAndBlobsSidecar`]>` objects received during syncing that will
/// be stored before we start dropping them.
const MAX_BLOB_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
/// The maximum number of queued `StatusMessage` objects received from the network RPC that will be
/// stored before we start dropping them.
const MAX_STATUS_QUEUE_LEN: usize = 1_024;
@@ -206,6 +210,7 @@ pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const BLOB_CHAIN_SEGMENT: &str = "blob_chain_segment";
/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
@@ -546,6 +551,19 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
pub fn blob_chain_segment(
process_id: ChainSegmentProcessId,
blocks_and_blobs: Vec<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
) -> Self {
Self {
drop_during_sync: false,
work: Work::BlobChainSegment {
process_id,
blocks_and_blobs,
},
}
}
/// Create a new work event to process `StatusMessage`s from the RPC network.
pub fn status_message(peer_id: PeerId, message: StatusMessage) -> Self {
Self {
@@ -809,6 +827,10 @@ pub enum Work<T: BeaconChainTypes> {
request_id: PeerRequestId,
request: BlobsByRootRequest,
},
BlobChainSegment {
process_id: ChainSegmentProcessId,
blocks_and_blobs: Vec<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
},
}
impl<T: BeaconChainTypes> Work<T> {
@@ -836,6 +858,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::BlobsByRootsRequest { .. } => BLOBS_BY_ROOTS_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::BlobChainSegment { .. } => BLOB_CHAIN_SEGMENT,
}
}
}
@@ -971,6 +994,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut blob_chain_segment_queue = FifoQueue::new(MAX_BLOB_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
let mut gossip_block_and_blobs_sidecar_queue =
FifoQueue::new(MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN);
@@ -1072,6 +1096,11 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
self.spawn_worker(item, toolbox);
// Check sync blocks before gossip blocks, since we've already explicitly
// requested these blocks.
} else if let Some(item) = blob_chain_segment_queue.pop() {
self.spawn_worker(item, toolbox);
// Sync block and blob segments have the same priority as normal chain
// segments. This here might change depending on how batch processing
// evolves.
} else if let Some(item) = rpc_block_queue.pop() {
self.spawn_worker(item, toolbox);
// Check delayed blocks before gossip blocks, the gossip blocks might rely
@@ -1339,6 +1368,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
request_id,
request,
} => todo!(),
Work::BlobChainSegment { .. } => {
blob_chain_segment_queue.push(work, work_id, &self.log)
}
}
}
}
@@ -1775,6 +1807,14 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
seen_timestamp,
)
}),
Work::BlobChainSegment {
process_id,
blocks_and_blobs,
} => task_spawner.spawn_async(async move {
worker
.process_blob_chain_segment(process_id, blocks_and_blobs)
.await
}),
};
}
}

View File

@@ -15,7 +15,7 @@ use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, Hash256, SignedBeaconBlock};
use types::{Epoch, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
@@ -241,6 +241,17 @@ impl<T: BeaconChainTypes> Worker<T> {
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
pub async fn process_blob_chain_segment(
&self,
sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
) {
warn!(self.log, "FAKE PROCESSING A BLOBS SEGMENT!!!");
let result = BatchProcessResult::Success {
was_non_empty: !downloaded_blocks.is_empty(),
};
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
async fn process_blocks<'a>(
&self,