Blob syncing (#24)

* add a rt is_blob_batch

* use the mixed type everywhere

* glue

* more glue

* minor fixes

* fix range tests

* filling in the gaps

* moore filling in the gaps
This commit is contained in:
Divma
2022-11-24 07:45:38 -05:00
committed by GitHub
parent ce097ac8d2
commit bf5005244e
10 changed files with 706 additions and 280 deletions

View File

@@ -1,11 +1,11 @@
use crate::sync::manager::Id;
use crate::sync::manager::{BlockTy, Id};
use lighthouse_network::rpc::methods::BlocksByRangeRequest;
use lighthouse_network::PeerId;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::ops::Sub;
use std::sync::Arc;
use types::{Epoch, EthSpec, SignedBeaconBlock, Slot};
use types::{Epoch, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot};
/// The number of times to retry a batch before it is considered failed.
const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5;
@@ -14,6 +14,22 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5;
/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty.
const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3;
pub enum BatchTy<T: EthSpec> {
Blocks(Vec<Arc<SignedBeaconBlock<T>>>),
BlocksAndBlobs(Vec<SignedBeaconBlockAndBlobsSidecar<T>>),
}
/// Error representing a batch with mixed block types.
#[derive(Debug)]
pub struct MixedBlockTyErr;
/// Type of expected batch.
#[derive(Debug, Clone)]
pub enum ExpectedBatchTy {
OnlyBlockBlobs,
OnlyBlock,
}
/// Allows customisation of the above constants used in other sync methods such as BackFillSync.
pub trait BatchConfig {
/// The maximum batch download attempts.
@@ -47,7 +63,7 @@ pub trait BatchConfig {
/// Note that simpler hashing functions considered in the past (hash of first block, hash of last
/// block, number of received blocks) are not good enough to differentiate attempts. For this
/// reason, we hash the complete set of blocks both in RangeSync and BackFillSync.
fn batch_attempt_hash<T: EthSpec>(blocks: &[Arc<SignedBeaconBlock<T>>]) -> u64;
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockTy<T>]) -> u64;
}
pub struct RangeSyncBatchConfig {}
@@ -59,7 +75,7 @@ impl BatchConfig for RangeSyncBatchConfig {
fn max_batch_processing_attempts() -> u8 {
MAX_BATCH_PROCESSING_ATTEMPTS
}
fn batch_attempt_hash<T: EthSpec>(blocks: &[Arc<SignedBeaconBlock<T>>]) -> u64 {
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockTy<T>]) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
blocks.hash(&mut hasher);
hasher.finish()
@@ -96,6 +112,8 @@ pub struct BatchInfo<T: EthSpec, B: BatchConfig = RangeSyncBatchConfig> {
failed_download_attempts: Vec<PeerId>,
/// State of the batch.
state: BatchState<T>,
/// Whether this batch contains all blocks or all blocks and blobs.
batch_type: ExpectedBatchTy,
/// Pin the generic
marker: std::marker::PhantomData<B>,
}
@@ -105,9 +123,9 @@ pub enum BatchState<T: EthSpec> {
/// The batch has failed either downloading or processing, but can be requested again.
AwaitingDownload,
/// The batch is being downloaded.
Downloading(PeerId, Vec<Arc<SignedBeaconBlock<T>>>, Id),
Downloading(PeerId, Vec<BlockTy<T>>, Id),
/// The batch has been completely downloaded and is ready for processing.
AwaitingProcessing(PeerId, Vec<Arc<SignedBeaconBlock<T>>>),
AwaitingProcessing(PeerId, Vec<BlockTy<T>>),
/// The batch is being processed.
Processing(Attempt),
/// The batch was successfully processed and is waiting to be validated.
@@ -139,8 +157,13 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
/// Epoch boundary | |
/// ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 |
/// Batch 1 | Batch 2 | Batch 3
pub fn new(start_epoch: &Epoch, num_of_epochs: u64) -> Self {
let start_slot = start_epoch.start_slot(T::slots_per_epoch()) + 1;
///
/// NOTE: Removed the shift by one for eip4844 because otherwise the last batch before the blob
/// fork boundary will be of mixed type (all blocks and one last blockblob), and I don't want to
/// deal with this for now.
/// This means finalization might be slower in eip4844
pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ExpectedBatchTy) -> Self {
let start_slot = start_epoch.start_slot(T::slots_per_epoch());
let end_slot = start_slot + num_of_epochs * T::slots_per_epoch();
BatchInfo {
start_slot,
@@ -149,6 +172,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
failed_download_attempts: Vec::new(),
non_faulty_processing_attempts: 0,
state: BatchState::AwaitingDownload,
batch_type,
marker: std::marker::PhantomData,
}
}
@@ -201,11 +225,14 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
}
/// Returns a BlocksByRange request associated with the batch.
pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest {
BlocksByRangeRequest {
start_slot: self.start_slot.into(),
count: self.end_slot.sub(self.start_slot).into(),
}
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ExpectedBatchTy) {
(
BlocksByRangeRequest {
start_slot: self.start_slot.into(),
count: self.end_slot.sub(self.start_slot).into(),
},
self.batch_type.clone(),
)
}
/// After different operations over a batch, this could be in a state that allows it to
@@ -231,7 +258,7 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
}
/// Adds a block to a downloading batch.
pub fn add_block(&mut self, block: Arc<SignedBeaconBlock<T>>) -> Result<(), WrongState> {
pub fn add_block(&mut self, block: BlockTy<T>) -> Result<(), WrongState> {
match self.state.poison() {
BatchState::Downloading(peer, mut blocks, req_id) => {
blocks.push(block);
@@ -363,11 +390,30 @@ impl<T: EthSpec, B: BatchConfig> BatchInfo<T, B> {
}
}
pub fn start_processing(&mut self) -> Result<Vec<Arc<SignedBeaconBlock<T>>>, WrongState> {
pub fn start_processing(&mut self) -> Result<BatchTy<T>, WrongState> {
match self.state.poison() {
BatchState::AwaitingProcessing(peer, blocks) => {
self.state = BatchState::Processing(Attempt::new::<B, T>(peer, &blocks));
Ok(blocks)
match self.batch_type {
ExpectedBatchTy::OnlyBlockBlobs => {
let blocks = blocks.into_iter().map(|block| {
let BlockTy::BlockAndBlob { block_sidecar_pair: block_and_blob } = block else {
panic!("Batches should never have a mixed type. This is a bug. Contact D")
};
block_and_blob
}).collect();
Ok(BatchTy::BlocksAndBlobs(blocks))
}
ExpectedBatchTy::OnlyBlock => {
let blocks = blocks.into_iter().map(|block| {
let BlockTy::Block { block } = block else {
panic!("Batches should never have a mixed type. This is a bug. Contact D")
};
block
}).collect();
Ok(BatchTy::Blocks(blocks))
}
}
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
@@ -461,10 +507,7 @@ pub struct Attempt {
}
impl Attempt {
fn new<B: BatchConfig, T: EthSpec>(
peer_id: PeerId,
blocks: &[Arc<SignedBeaconBlock<T>>],
) -> Self {
fn new<B: BatchConfig, T: EthSpec>(peer_id: PeerId, blocks: &[BlockTy<T>]) -> Self {
let hash = B::batch_attempt_hash(blocks);
Attempt { peer_id, hash }
}

View File

@@ -1,5 +1,7 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use super::BatchTy;
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::sync::manager::BlockTy;
use crate::sync::{
manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult,
};
@@ -10,8 +12,7 @@ use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{Epoch, EthSpec, Hash256, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
@@ -19,7 +20,7 @@ use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which
/// case the responder will fill the response up to the max request size, assuming they have the
/// bandwidth to do so.
pub const EPOCHS_PER_BATCH: u64 = 2;
pub const EPOCHS_PER_BATCH: u64 = 1;
/// The maximum number of batches to queue before requesting more.
const BATCH_BUFFER_SIZE: u8 = 5;
@@ -225,7 +226,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch_id: BatchId,
peer_id: &PeerId,
request_id: Id,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
beacon_block: Option<BlockTy<T::EthSpec>>,
) -> ProcessingResult {
// check if we have this batch
let batch = match self.batches.get_mut(&batch_id) {
@@ -326,9 +327,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized);
self.current_processing_batch = Some(batch_id);
if let Err(e) =
beacon_processor_send.try_send(BeaconWorkEvent::chain_segment(process_id, blocks))
{
let work_event = match blocks {
BatchTy::Blocks(blocks) => BeaconWorkEvent::chain_segment(process_id, blocks),
BatchTy::BlocksAndBlobs(blocks_and_blobs) => {
BeaconWorkEvent::blob_chain_segment(process_id, blocks_and_blobs)
}
};
if let Err(e) = beacon_processor_send.try_send(work_event) {
crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch",
"error" => %e, "batch" => self.processing_target);
// This is unlikely to happen but it would stall syncing since the batch now has no
@@ -897,8 +903,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
peer: PeerId,
) -> ProcessingResult {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let request = batch.to_blocks_by_range_request();
match network.blocks_by_range_request(peer, request, self.id, batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();
match network.blocks_by_range_request(peer, batch_type, request, self.id, batch_id) {
Ok(request_id) => {
// inform the batch about the new request
batch.start_downloading_from_peer(peer, request_id)?;
@@ -1002,7 +1008,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if let Some(epoch) = self.optimistic_start {
if let Entry::Vacant(entry) = self.batches.entry(epoch) {
if let Some(peer) = idle_peers.pop() {
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH);
let batch_type = network.batch_type(epoch);
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type);
entry.insert(optimistic_batch);
self.send_batch(network, epoch, peer)?;
}
@@ -1011,7 +1018,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
while let Some(peer) = idle_peers.pop() {
if let Some(batch_id) = self.include_next_batch() {
if let Some(batch_id) = self.include_next_batch(network) {
// send the batch
self.send_batch(network, batch_id, peer)?;
} else {
@@ -1025,7 +1032,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self) -> Option<BatchId> {
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
// don't request batches beyond the target head slot
if self
.to_be_downloaded
@@ -1059,10 +1066,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Entry::Occupied(_) => {
// this batch doesn't need downloading, let this same function decide the next batch
self.to_be_downloaded += EPOCHS_PER_BATCH;
self.include_next_batch()
self.include_next_batch(network)
}
Entry::Vacant(entry) => {
entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH));
let batch_type = network.batch_type(batch_id);
entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH, batch_type));
self.to_be_downloaded += EPOCHS_PER_BATCH;
Some(batch_id)
}

View File

@@ -8,7 +8,10 @@ mod chain_collection;
mod range;
mod sync_type;
pub use batch::{BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState};
pub use batch::{
BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, BatchTy,
ExpectedBatchTy,
};
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};
pub use range::RangeSync;
pub use sync_type::RangeSyncType;

View File

@@ -44,7 +44,7 @@ use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
use super::chain_collection::ChainCollection;
use super::sync_type::RangeSyncType;
use crate::status::ToStatusMessage;
use crate::sync::manager::Id;
use crate::sync::manager::{BlockTy, Id};
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::BatchProcessResult;
use beacon_chain::{BeaconChain, BeaconChainTypes};
@@ -55,7 +55,7 @@ use lru_cache::LRUTimeCache;
use slog::{crit, debug, trace, warn};
use std::collections::HashMap;
use std::sync::Arc;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{Epoch, EthSpec, Hash256, Slot};
/// For how long we store failed finalized chains to prevent retries.
const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30;
@@ -202,7 +202,7 @@ where
chain_id: ChainId,
batch_id: BatchId,
request_id: Id,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
beacon_block: Option<BlockTy<T::EthSpec>>,
) {
// check if this chunk removes the chain
match self.chains.call_by_id(chain_id, |chain| {
@@ -372,6 +372,7 @@ where
#[cfg(test)]
mod tests {
use crate::service::RequestId;
use crate::sync::range_sync::ExpectedBatchTy;
use crate::NetworkMessage;
use super::*;
@@ -682,10 +683,13 @@ mod tests {
// add some peers
let (peer1, local_info, head_info) = rig.head_peer();
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id)
}
let ((chain1, batch1, _), id1) = match rig.grab_request(&peer1).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => (
rig.cx
.range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock)
.unwrap(),
id,
),
other => panic!("unexpected request {:?}", other),
};
@@ -701,10 +705,13 @@ mod tests {
// while the ee is offline, more peers might arrive. Add a new finalized peer.
let (peer2, local_info, finalized_info) = rig.finalized_peer();
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => {
(rig.cx.range_sync_response(id, true).unwrap(), id)
}
let ((chain2, batch2, _), id2) = match rig.grab_request(&peer2).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => (
rig.cx
.range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock)
.unwrap(),
id,
),
other => panic!("unexpected request {:?}", other),
};