a lot more reprocessing work

This commit is contained in:
realbigsean
2023-03-31 09:09:56 -04:00
parent b78a6e8d1f
commit 8403402620
14 changed files with 365 additions and 151 deletions

View File

@@ -1939,8 +1939,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
blob_sidecar: SignedBlobSidecar<T::EthSpec>,
subnet_id: u64,
) -> Result<GossipVerifiedBlob<T::EthSpec>, BlobError> // TODO(pawan): make a GossipVerifedBlob type
{
) -> Result<GossipVerifiedBlob<T::EthSpec>, BlobError> {
blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self)
}
@@ -2626,7 +2625,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
.await
{
Ok(_) => imported_blocks += 1,
Ok(status) => {
imported_blocks += 1;
match status {
AvailabilityProcessingStatus::Imported(_) => {
// The block was imported successfully.
}
AvailabilityProcessingStatus::PendingBlobs(blobs) => {}
AvailabilityProcessingStatus::PendingBlock(_) => {
// doesn't makes sense
}
}
}
Err(error) => {
return ChainSegmentResult::Failed {
imported_blocks,

View File

@@ -12,6 +12,7 @@ use crate::data_availability_checker::{
use crate::kzg_utils::{validate_blob, validate_blobs};
use crate::BeaconChainError;
use kzg::Kzg;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconBlockRef, BeaconStateError, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256,
KzgCommitment, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot,
@@ -136,9 +137,15 @@ pub struct GossipVerifiedBlob<T: EthSpec> {
}
impl<T: EthSpec> GossipVerifiedBlob<T> {
pub fn id(&self) -> BlobIdentifier {
self.blob.id()
}
pub fn block_root(&self) -> Hash256 {
self.blob.block_root
}
pub fn to_blob(self) -> Arc<BlobSidecar<T>> {
self.blob
}
}
pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
@@ -287,19 +294,14 @@ impl<T: EthSpec> KzgVerifiedBlob<T> {
}
pub fn verify_kzg_for_blob<T: EthSpec>(
blob: GossipVerifiedBlob<T>,
blob: Arc<BlobSidecar<T>>,
kzg: &Kzg,
) -> Result<KzgVerifiedBlob<T>, AvailabilityCheckError> {
//TODO(sean) remove clone
if validate_blob::<T>(
kzg,
blob.blob.blob.clone(),
blob.blob.kzg_commitment,
blob.blob.kzg_proof,
)
.map_err(AvailabilityCheckError::Kzg)?
if validate_blob::<T>(kzg, blob.blob.clone(), blob.kzg_commitment, blob.kzg_proof)
.map_err(AvailabilityCheckError::Kzg)?
{
Ok(KzgVerifiedBlob { blob: blob.blob })
Ok(KzgVerifiedBlob { blob })
} else {
Err(AvailabilityCheckError::KzgVerificationFailed)
}
@@ -449,6 +451,15 @@ pub enum BlockWrapper<E: EthSpec> {
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, Vec<Arc<BlobSidecar<E>>>),
}
impl<E: EthSpec> BlockWrapper<E> {
pub fn deconstruct(self) -> (Arc<SignedBeaconBlock<E>>, Option<Vec<Arc<BlobSidecar<E>>>>) {
match self {
BlockWrapper::Block(block) => (block, None),
BlockWrapper::BlockAndBlobs(block, blobs) => (block, Some(blobs)),
}
}
}
impl<E: EthSpec> AsBlock<E> for BlockWrapper<E> {
fn slot(&self) -> Slot {
self.as_block().slot()

View File

@@ -63,6 +63,7 @@ pub struct DataAvailabilityChecker<T: EthSpec, S: SlotClock> {
struct GossipBlobCache<T: EthSpec> {
verified_blobs: Vec<KzgVerifiedBlob<T>>,
executed_block: Option<AvailabilityPendingExecutedBlock<T>>,
missing_blob_ids: Vec<BlobIdentifier>,
}
impl<T: EthSpec> GossipBlobCache<T> {
@@ -70,13 +71,16 @@ impl<T: EthSpec> GossipBlobCache<T> {
Self {
verified_blobs: vec![blob],
executed_block: None,
missing_blob_ids: vec![],
}
}
fn new_from_block(block: AvailabilityPendingExecutedBlock<T>) -> Self {
let missing_blob_ids = block.get_all_blob_ids();
Self {
verified_blobs: vec![],
executed_block: Some(block),
missing_blob_ids,
}
}
@@ -117,6 +121,22 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
self.rpc_blob_cache.read().get(blob_id).cloned()
}
pub fn put_rpc_blob(
&self,
blob: Arc<BlobSidecar<T>>,
) -> Result<Availability<T>, AvailabilityCheckError> {
// Verify the KZG commitment.
let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() {
verify_kzg_for_blob(blob, kzg)?
} else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
self.put_kzg_verified_blob(kzg_verified_blob, |blob_id, missing_blob_ids| {
missing_blob_ids.contains(&blob_id)
})
}
/// This first validate the KZG commitments included in the blob sidecar.
/// Check if we've cached other blobs for this block. If it completes a set and we also
/// have a block cached, return the Availability variant triggering block import.
@@ -127,17 +147,23 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
&self,
gossip_blob: GossipVerifiedBlob<T>,
) -> Result<Availability<T>, AvailabilityCheckError> {
let block_root = gossip_blob.block_root();
// Verify the KZG commitments.
let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() {
verify_kzg_for_blob(gossip_blob, kzg)?
verify_kzg_for_blob(gossip_blob.to_blob(), kzg)?
} else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
let blob = kzg_verified_blob.clone_blob();
self.put_kzg_verified_blob(kzg_verified_blob, |_, _| true)
}
fn put_kzg_verified_blob(
&self,
kzg_verified_blob: KzgVerifiedBlob<T>,
predicate: impl FnOnce(BlobIdentifier, &[BlobIdentifier]) -> bool,
) -> Result<Availability<T>, AvailabilityCheckError> {
let blob = kzg_verified_blob.clone_blob();
let blob_id = blob.id();
let mut blob_cache = self.gossip_blob_cache.lock();
// Gossip cache.
@@ -147,6 +173,10 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
// should filter duplicates, as well as validate indices.
let cache = occupied_entry.get_mut();
if !predicate(blob_id, cache.missing_blob_ids.as_slice()) {
// ignore this blob
}
cache
.verified_blobs
.insert(blob.index as usize, kzg_verified_blob);
@@ -154,7 +184,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
if let Some(executed_block) = cache.executed_block.take() {
self.check_block_availability_maybe_cache(occupied_entry, executed_block)?
} else {
Availability::PendingBlock(block_root)
Availability::PendingBlock(blob.block_root)
}
}
Entry::Vacant(vacant_entry) => {
@@ -169,9 +199,8 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
if let Some(blob_ids) = availability.get_available_blob_ids() {
self.prune_rpc_blob_cache(&blob_ids);
} else {
self.rpc_blob_cache.write().insert(blob.id(), blob.clone());
self.rpc_blob_cache.write().insert(blob_id, blob);
}
Ok(availability)
}
@@ -233,6 +262,7 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
.get_filtered_blob_ids(|index| cache.verified_blobs.get(index).is_none());
let _ = cache.executed_block.insert(executed_block);
cache.missing_blob_ids = missing_blob_ids.clone();
Ok(Availability::PendingBlobs(missing_blob_ids))
}

View File

@@ -64,10 +64,10 @@ use std::{cmp, collections::HashSet};
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
SyncCommitteeMessage, SyncSubnetId,
Attestation, AttesterSlashing, BlobSidecar, Hash256, LightClientFinalityUpdate,
LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit,
SubnetId, SyncCommitteeMessage, SyncSubnetId,
};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
@@ -119,7 +119,7 @@ const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SignedBeaconBlockAndBlobsSidecar` objects received on gossip that
/// will be stored before we start dropping them.
const MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN: usize = 1_024;
const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
/// within acceptable clock disparity) that will be queued before we start dropping them.
@@ -160,6 +160,7 @@ const MAX_SYNC_CONTRIBUTION_QUEUE_LEN: usize = 1024;
/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024 * 4; // TODO(sean) make function of max blobs per block? or is this just too big?
/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
/// be stored before we start dropping them.
@@ -229,6 +230,7 @@ pub const GOSSIP_SYNC_CONTRIBUTION: &str = "gossip_sync_contribution";
pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
pub const RPC_BLOCK: &str = "rpc_block";
pub const RPC_BLOB: &str = "rpc_blob";
pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const STATUS_PROCESSING: &str = "status_processing";
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
@@ -607,7 +609,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
/// sent to the other side of `result_tx`.
pub fn rpc_beacon_block(
block_root: Hash256,
block: BlockWrapper<T::EthSpec>,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Self {
@@ -623,6 +625,21 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
pub fn rpc_blob(
blob: Arc<BlobSidecar<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Self {
Self {
drop_during_sync: false,
work: Work::RpcBlob {
block: blob,
seen_timestamp,
process_type,
},
}
}
/// Create a new work event to import `blocks` as a beacon chain segment.
pub fn chain_segment(
process_id: ChainSegmentProcessId,
@@ -914,11 +931,16 @@ pub enum Work<T: BeaconChainTypes> {
},
RpcBlock {
block_root: Hash256,
block: BlockWrapper<T::EthSpec>,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
should_process: bool,
},
RpcBlob {
block: Arc<BlobSidecar<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
},
ChainSegment {
process_id: ChainSegmentProcessId,
blocks: Vec<BlockWrapper<T::EthSpec>>,
@@ -978,6 +1000,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::GossipLightClientFinalityUpdate { .. } => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE,
Work::GossipLightClientOptimisticUpdate { .. } => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
Work::RpcBlock { .. } => RPC_BLOCK,
Work::RpcBlob { .. } => RPC_BLOB,
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::Status { .. } => STATUS_PROCESSING,
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
@@ -1128,11 +1151,11 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
let mut gossip_block_and_blobs_sidecar_queue =
FifoQueue::new(MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN);
let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN);
let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN);
let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
@@ -1239,6 +1262,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// evolves.
} else if let Some(item) = rpc_block_queue.pop() {
self.spawn_worker(item, toolbox);
} else if let Some(item) = rpc_blob_queue.pop() {
self.spawn_worker(item, toolbox);
// Check delayed blocks before gossip blocks, the gossip blocks might rely
// on the delayed ones.
} else if let Some(item) = delayed_block_queue.pop() {
@@ -1247,7 +1272,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
self.spawn_worker(item, toolbox);
} else if let Some(item) = gossip_block_and_blobs_sidecar_queue.pop() {
} else if let Some(item) = gossip_blob_queue.pop() {
self.spawn_worker(item, toolbox);
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively give us
@@ -1463,7 +1488,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
gossip_block_queue.push(work, work_id, &self.log)
}
Work::GossipSignedBlobSidecar { .. } => {
gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log)
gossip_blob_queue.push(work, work_id, &self.log)
}
Work::DelayedImportBlock { .. } => {
delayed_block_queue.push(work, work_id, &self.log)
@@ -1488,6 +1513,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
optimistic_update_queue.push(work, work_id, &self.log)
}
Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log),
Work::RpcBlob { .. } => rpc_blob_queue.push(work, work_id, &self.log),
Work::ChainSegment { ref process_id, .. } => match process_id {
ChainSegmentProcessId::RangeBatchId { .. }
| ChainSegmentProcessId::ParentLookup { .. } => {
@@ -1557,6 +1583,10 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
&metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL,
rpc_block_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL,
rpc_blob_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
chain_segment_queue.len() as i64,
@@ -1906,6 +1936,15 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
duplicate_cache,
should_process,
)),
Work::RpcBlob {
block,
seen_timestamp,
process_type,
} => task_spawner.spawn_async(worker.process_rpc_blob(
block,
seen_timestamp,
process_type,
)),
/*
* Verification for a chain segment (multiple blocks).
*/

View File

@@ -14,7 +14,6 @@ use super::MAX_SCHEDULED_WORK_QUEUE_LEN;
use crate::metrics;
use crate::sync::manager::BlockProcessType;
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use fnv::FnvHashMap;
use futures::task::Poll;
@@ -25,13 +24,15 @@ use slog::{debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId,
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof,
SignedBeaconBlock, SubnetId,
};
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
@@ -135,7 +136,7 @@ pub struct QueuedGossipBlock<T: BeaconChainTypes> {
/// It is queued for later import.
pub struct QueuedRpcBlock<T: EthSpec> {
pub block_root: Hash256,
pub block: BlockWrapper<T>,
pub block: Arc<SignedBeaconBlock<T>>,
pub process_type: BlockProcessType,
pub seen_timestamp: Duration,
/// Indicates if the beacon chain should process this block or not.

View File

@@ -18,7 +18,7 @@ use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, Hash256, SignedBeaconBlock};
use types::{BlobSidecar, Epoch, Hash256, SignedBeaconBlock};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
@@ -45,7 +45,7 @@ impl<T: BeaconChainTypes> Worker<T> {
pub async fn process_rpc_block(
self,
block_root: Hash256,
block: BlockWrapper<T::EthSpec>,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
@@ -136,6 +136,27 @@ impl<T: BeaconChainTypes> Worker<T> {
drop(handle);
}
pub async fn process_rpc_blob(
self,
blob: Arc<BlobSidecar<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) {
let result = self
.chain
.check_availability_and_maybe_import(
|chain| chain.data_availability_checker.put_rpc_blob(blob),
CountUnrealized::True,
)
.await;
// Sync handles these results
self.send_sync_message(SyncMessage::BlobProcessed {
process_type,
result,
});
}
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub async fn process_chain_segment(

View File

@@ -451,8 +451,8 @@ impl<T: BeaconChainTypes> Router<T> {
}
id @ (SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::BackFillBlobs { .. }
| SyncId::RangeBlobs { .. }) => id,
| SyncId::BackFillBlockAndBlobs { .. }
| SyncId::RangeBlockAndBlobs { .. }) => id,
},
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
};
@@ -510,8 +510,8 @@ impl<T: BeaconChainTypes> Router<T> {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlobs { .. }
| SyncId::BackFillBlobs { .. } => {
| SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}
},
@@ -543,8 +543,8 @@ impl<T: BeaconChainTypes> Router<T> {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlobs { .. }
| SyncId::BackFillBlobs { .. } => {
| SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => {
unreachable!("Batch syncing does not request BBRoot requests")
}
},

View File

@@ -1,11 +1,12 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChainTypes, BlockError};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
use fnv::FnvHashMap;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::{PeerAction, PeerId};
@@ -14,6 +15,7 @@ use slog::{debug, error, trace, warn, Logger};
use smallvec::SmallVec;
use store::Hash256;
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, SignedBeaconBlock};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics;
@@ -36,7 +38,7 @@ mod single_block_lookup;
#[cfg(test)]
mod tests;
pub type RootBlockTuple<T> = (Hash256, BlockWrapper<T>);
pub type RootBlockTuple<T> = (Hash256, Arc<SignedBeaconBlock<T>>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
@@ -145,6 +147,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>,
) {
//TODO(sean) handle delay
//TODO(sean) cannot use peer id here cause it assumes it has the block, this is from gossip so not true
self.search_block(hash, peer_id, cx);
}
@@ -206,7 +209,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
id: Id,
peer_id: PeerId,
block: Option<BlockWrapper<T::EthSpec>>,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
) {
@@ -271,7 +274,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
id: Id,
peer_id: PeerId,
block: Option<BlockWrapper<T::EthSpec>>,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
) {
@@ -349,6 +352,28 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
pub fn single_lookup_blob_response(
&mut self,
id: Id,
peer_id: PeerId,
block: Option<Arc<BlobSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
) {
todo!()
}
pub fn parent_lookup_blob_response(
&mut self,
id: Id,
peer_id: PeerId,
block: Option<Arc<BlobSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
) {
todo!()
}
/* Error responses */
#[allow(clippy::needless_collect)] // false positive
@@ -472,11 +497,18 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};
match result {
BlockProcessResult::Ok => {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
}
BlockProcessResult::MissingBlobs(blobs) => {
todo!()
BlockProcessResult::Ok(status) => {
match status {
AvailabilityProcessingStatus::Imported(hash) => {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
}
AvailabilityProcessingStatus::PendingBlobs(blobs) => {
// trigger?
}
AvailabilityProcessingStatus::PendingBlock(hash) => {
// logic error
}
}
}
BlockProcessResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
@@ -558,11 +590,18 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};
match &result {
BlockProcessResult::Ok => {
trace!(self.log, "Parent block processing succeeded"; &parent_lookup)
}
BlockProcessResult::MissingBlobs(blobs) => {
todo!()
BlockProcessResult::Ok(status) => {
match status {
AvailabilityProcessingStatus::Imported(hash) => {
trace!(self.log, "Parent block processing succeeded"; &parent_lookup)
}
AvailabilityProcessingStatus::PendingBlobs(blobs) => {
// trigger?
}
AvailabilityProcessingStatus::PendingBlock(hash) => {
// logic error
}
}
}
BlockProcessResult::Err(e) => {
trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e)
@@ -578,8 +617,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
match result {
BlockProcessResult::MissingBlobs(blobs) => {
todo!()
BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => {
// doesn't make sense
}
BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(blobs)) => {
// trigger
}
BlockProcessResult::Err(BlockError::ParentUnknown(block)) => {
// need to keep looking for parents
@@ -587,7 +629,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
parent_lookup.add_block(block);
self.request_parent(parent_lookup, cx);
}
BlockProcessResult::Ok
BlockProcessResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => {
// Check if the beacon processor is available
let beacon_processor_send = match cx.processor_channel_if_enabled() {
@@ -666,6 +708,24 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}
pub fn single_blob_processed(
&mut self,
id: Id,
result: BlockProcessResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
todo!()
}
pub fn parent_blob_processed(
&mut self,
chain_hash: Hash256,
result: BlockProcessResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
todo!()
}
pub fn parent_chain_processed(
&mut self,
chain_hash: Hash256,
@@ -709,7 +769,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
fn send_block_for_processing(
&mut self,
block_root: Hash256,
block: BlockWrapper<T::EthSpec>,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
duration: Duration,
process_type: BlockProcessType,
cx: &mut SyncNetworkContext<T>,

View File

@@ -7,8 +7,10 @@ use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId;
use std::sync::Arc;
use store::Hash256;
use strum::IntoStaticStr;
use types::{BlobSidecar, SignedBeaconBlock};
use super::single_block_lookup::{self, SingleBlockRequest};
@@ -25,6 +27,7 @@ pub(crate) struct ParentLookup<T: BeaconChainTypes> {
chain_hash: Hash256,
/// The blocks that have currently been downloaded.
downloaded_blocks: Vec<RootBlockTuple<T::EthSpec>>,
downloaded_blobs: Vec<Option<Vec<Arc<BlobSidecar<T::EthSpec>>>>>,
/// Request of the last parent.
current_parent_request: SingleBlockRequest<PARENT_FAIL_TOLERANCE>,
/// Id of the last parent request.
@@ -59,12 +62,18 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.any(|(root, _d_block)| root == block_root)
}
pub fn new(block_root: Hash256, block: BlockWrapper<T::EthSpec>, peer_id: PeerId) -> Self {
pub fn new(
block_root: Hash256,
block_wrapper: BlockWrapper<T::EthSpec>,
peer_id: PeerId,
) -> Self {
let (block, blobs) = block_wrapper.deconstruct();
let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id);
Self {
chain_hash: block_root,
downloaded_blocks: vec![(block_root, block)],
downloaded_blobs: vec![blobs],
current_parent_request,
current_parent_request_id: None,
}
@@ -94,10 +103,12 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
self.current_parent_request.check_peer_disconnected(peer_id)
}
pub fn add_block(&mut self, block: BlockWrapper<T::EthSpec>) {
let next_parent = block.parent_root();
pub fn add_block(&mut self, block_wrapper: BlockWrapper<T::EthSpec>) {
let next_parent = block_wrapper.parent_root();
let current_root = self.current_parent_request.hash;
let (block, blobs) = block_wrapper.deconstruct();
self.downloaded_blocks.push((current_root, block));
self.downloaded_blobs.push(blobs);
self.current_parent_request.hash = next_parent;
self.current_parent_request.state = single_block_lookup::State::AwaitingDownload;
self.current_parent_request_id = None;
@@ -120,14 +131,23 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
let ParentLookup {
chain_hash,
downloaded_blocks,
downloaded_blobs,
current_parent_request,
current_parent_request_id: _,
} = self;
let block_count = downloaded_blocks.len();
let mut blocks = Vec::with_capacity(block_count);
let mut hashes = Vec::with_capacity(block_count);
for (hash, block) in downloaded_blocks {
blocks.push(block);
for ((hash, block), blobs) in downloaded_blocks
.into_iter()
.zip(downloaded_blobs.into_iter())
{
let wrapped_block = if let Some(blobs) = blobs {
BlockWrapper::BlockAndBlobs(block, blobs)
} else {
BlockWrapper::Block(block)
};
blocks.push(wrapped_block);
hashes.push(hash);
}
(chain_hash, blocks, hashes, current_parent_request)
@@ -152,7 +172,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
/// the processing result of the block.
pub fn verify_block(
&mut self,
block: Option<BlockWrapper<T::EthSpec>>,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<RootBlockTuple<T::EthSpec>>, VerifyError> {
let root_and_block = self.current_parent_request.verify_block(block)?;

View File

@@ -6,10 +6,15 @@ use lighthouse_network::{rpc::BlocksByRootRequest, PeerId};
use rand::seq::IteratorRandom;
use ssz_types::VariableList;
use std::collections::HashSet;
use std::sync::Arc;
use store::{EthSpec, Hash256};
use strum::IntoStaticStr;
use types::blob_sidecar::BlobIdentifier;
use types::SignedBeaconBlock;
/// Object representing a single block lookup request.
///
//previously assumed we would have a single block. Now we may have the block but not the blobs
#[derive(PartialEq, Eq)]
pub struct SingleBlockRequest<const MAX_ATTEMPTS: u8> {
/// The hash of the requested block.
@@ -24,6 +29,7 @@ pub struct SingleBlockRequest<const MAX_ATTEMPTS: u8> {
failed_processing: u8,
/// How many times have we attempted to download this block.
failed_downloading: u8,
missing_blobs: Vec<BlobIdentifier>,
}
#[derive(Debug, PartialEq, Eq)]
@@ -59,6 +65,7 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
used_peers: HashSet::default(),
failed_processing: 0,
failed_downloading: 0,
missing_blobs: vec![],
}
}
@@ -105,7 +112,7 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
/// Returns the block for processing if the response is what we expected.
pub fn verify_block<T: EthSpec>(
&mut self,
block: Option<BlockWrapper<T>>,
block: Option<Arc<SignedBeaconBlock<T>>>,
) -> Result<Option<RootBlockTuple<T>>, VerifyError> {
match self.state {
State::AwaitingDownload => {
@@ -116,7 +123,7 @@ impl<const MAX_ATTEMPTS: u8> SingleBlockRequest<MAX_ATTEMPTS> {
Some(block) => {
// Compute the block root using this specific function so that we can get timing
// metrics.
let block_root = get_block_root(block.as_block());
let block_root = get_block_root(&block);
if block_root != self.hash {
// return an error and drop the block
// NOTE: we take this is as a download failure to prevent counting the

View File

@@ -83,11 +83,11 @@ pub enum RequestId {
/// Request was from the backfill sync algorithm.
BackFillBlocks { id: Id },
/// Backfill request that is composed by both a block range request and a blob range request.
BackFillBlobs { id: Id },
BackFillBlockAndBlobs { id: Id },
/// The request was from a chain in the range sync algorithm.
RangeBlocks { id: Id },
/// Range request that is composed by both a block range request and a blob range request.
RangeBlobs { id: Id },
RangeBlockAndBlobs { id: Id },
}
// TODO(diva) I'm updating functions what at a time, but this should be revisited because I think
@@ -157,6 +157,12 @@ pub enum SyncMessage<T: EthSpec> {
process_type: BlockProcessType,
result: BlockProcessResult<T>,
},
/// Block processed
BlobProcessed {
process_type: BlockProcessType,
result: Result<AvailabilityProcessingStatus, BlockError<T>>,
},
}
/// The type of processing specified for a received block.
@@ -168,8 +174,7 @@ pub enum BlockProcessType {
#[derive(Debug)]
pub enum BlockProcessResult<T: EthSpec> {
Ok,
MissingBlobs(Vec<BlobIdentifier>),
Ok(AvailabilityProcessingStatus),
Err(BlockError<T>),
Ignored,
}
@@ -322,7 +327,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
RequestId::BackFillBlobs { id } => {
RequestId::BackFillBlockAndBlobs { id } => {
if let Some(batch_id) = self
.network
.backfill_request_failed(id, ByRangeRequestType::BlocksAndBlobs)
@@ -351,7 +356,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.update_sync_state()
}
}
RequestId::RangeBlobs { id } => {
RequestId::RangeBlockAndBlobs { id } => {
if let Some((chain_id, batch_id)) = self
.network
.range_sync_request_failed(id, ByRangeRequestType::BlocksAndBlobs)
@@ -576,24 +581,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
beacon_block,
seen_timestamp,
} => {
self.rpc_block_or_blob_received(
request_id,
peer_id,
beacon_block.into(),
seen_timestamp,
);
self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp);
}
SyncMessage::RpcBlob {
request_id,
peer_id,
blob_sidecar,
seen_timestamp,
} => self.rpc_block_or_blob_received(
request_id,
peer_id,
blob_sidecar.into(),
seen_timestamp,
),
} => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp),
SyncMessage::UnknownBlock(peer_id, block, block_root) => {
// If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
if !self.network_globals.sync_state.read().is_synced() {
@@ -670,6 +665,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.block_lookups
.parent_block_processed(chain_hash, result, &mut self.network),
},
SyncMessage::BlobProcessed {
process_type,
result,
} => match process_type {
BlockProcessType::SingleBlock { id } => {
self.block_lookups
.single_blob_processed(id, result.into(), &mut self.network)
}
BlockProcessType::ParentLookup { chain_hash } => self
.block_lookups
.parent_blob_processed(chain_hash, result.into(), &mut self.network),
},
SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => {
self.range_sync.handle_block_process_result(
@@ -763,50 +770,30 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn rpc_block_or_blob_received(
fn rpc_block_received(
&mut self,
request_id: RequestId,
peer_id: PeerId,
block_or_blob: BlockOrBlob<T::EthSpec>,
block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { id } => {
// TODO(diva) adjust when dealing with by root requests. This code is here to
// satisfy dead code analysis
match block_or_blob {
BlockOrBlob::Block(maybe_block) => {
self.block_lookups.single_block_lookup_response(
id,
peer_id,
maybe_block.map(BlockWrapper::Block),
seen_timestamp,
&mut self.network,
)
}
BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."),
}
}
RequestId::ParentLookup { id } => {
// TODO(diva) adjust when dealing with by root requests. This code is here to
// satisfy dead code analysis
match block_or_blob {
BlockOrBlob::Block(maybe_block) => self.block_lookups.parent_lookup_response(
id,
peer_id,
maybe_block.map(BlockWrapper::Block),
seen_timestamp,
&mut self.network,
),
BlockOrBlob::Sidecar(_) => unimplemented!("Mimatch between BlockWrapper and what the network receives needs to be handled first."),
}
}
RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response(
id,
peer_id,
block,
seen_timestamp,
&mut self.network,
),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response(
id,
peer_id,
block,
seen_timestamp,
&mut self.network,
),
RequestId::BackFillBlocks { id } => {
let maybe_block = match block_or_blob {
BlockOrBlob::Block(maybe_block) => maybe_block,
BlockOrBlob::Sidecar(_) => todo!("I think this is unreachable"),
};
let is_stream_terminator = maybe_block.is_none();
let is_stream_terminator = block.is_none();
if let Some(batch_id) = self
.network
.backfill_sync_only_blocks_response(id, is_stream_terminator)
@@ -816,7 +803,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
batch_id,
&peer_id,
id,
maybe_block.map(|block| block.into()),
block.map(BlockWrapper::Block),
) {
Ok(ProcessResult::SyncCompleted) => self.update_sync_state(),
Ok(ProcessResult::Successful) => {}
@@ -829,14 +816,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
RequestId::RangeBlocks { id } => {
let maybe_block = match block_or_blob {
BlockOrBlob::Block(maybe_block) => maybe_block,
BlockOrBlob::Sidecar(_) => todo!("I think this should be unreachable, since this is a range only-blocks request, and the network should not accept this chunk at all. Needs better handling"),
};
let is_stream_terminator = maybe_block.is_none();
let is_stream_terminator = block.is_none();
if let Some((chain_id, batch_id)) = self
.network
.range_sync_block_response(id, is_stream_terminator)
.range_sync_block_only_response(id, is_stream_terminator)
{
self.range_sync.blocks_by_range_response(
&mut self.network,
@@ -844,17 +827,53 @@ impl<T: BeaconChainTypes> SyncManager<T> {
chain_id,
batch_id,
id,
maybe_block.map(|block| block.into()),
block.map(BlockWrapper::Block),
);
self.update_sync_state();
}
}
RequestId::BackFillBlobs { id } => {
self.backfill_block_and_blobs_response(id, peer_id, block_or_blob)
RequestId::BackFillBlockAndBlobs { id } => {
self.backfill_block_and_blobs_response(id, peer_id, block.into())
}
RequestId::RangeBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, block_or_blob)
RequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, block.into())
}
}
}
fn rpc_blob_received(
&mut self,
request_id: RequestId,
peer_id: PeerId,
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { id } => self.block_lookups.single_lookup_blob_response(
id,
peer_id,
blob,
seen_timestamp,
&mut self.network,
),
RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_blob_response(
id,
peer_id,
blob,
seen_timestamp,
&mut self.network,
),
RequestId::BackFillBlocks { id } => {
todo!()
}
RequestId::RangeBlocks { id } => {
todo!()
}
RequestId::BackFillBlockAndBlobs { id } => {
self.backfill_block_and_blobs_response(id, peer_id, blob.into())
}
RequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, blob.into())
}
}
}
@@ -898,7 +917,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e
);
// TODO: penalize the peer for being a bad boy
let id = RequestId::RangeBlobs { id };
let id = RequestId::RangeBlockAndBlobs { id };
self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
}
}
@@ -950,7 +969,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e
);
// TODO: penalize the peer for being a bad boy
let id = RequestId::BackFillBlobs { id };
let id = RequestId::BackFillBlockAndBlobs { id };
self.inject_error(peer_id, id, RPCError::InvalidData(e.into()))
}
}
@@ -963,14 +982,8 @@ impl<T: EthSpec> From<Result<AvailabilityProcessingStatus, BlockError<T>>>
{
fn from(result: Result<AvailabilityProcessingStatus, BlockError<T>>) -> Self {
match result {
Ok(AvailabilityProcessingStatus::Imported(_)) => BlockProcessResult::Ok,
Ok(AvailabilityProcessingStatus::PendingBlock(_)) => {
todo!() // doesn't make sense
}
Ok(AvailabilityProcessingStatus::PendingBlobs(blobs)) => {
BlockProcessResult::MissingBlobs(blobs)
}
Err(e) => e.into(),
Ok(status) => BlockProcessResult::Ok(status),
Err(e) => BlockProcessResult::Err(e),
}
}
}

View File

@@ -186,7 +186,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::RangeBlobs { id });
let request_id = RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id });
// Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
@@ -259,7 +259,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// create the shared request id. This is fine since the rpc handles substream ids.
let id = self.next_id();
let request_id = RequestId::Sync(SyncRequestId::BackFillBlobs { id });
let request_id = RequestId::Sync(SyncRequestId::BackFillBlockAndBlobs { id });
// Create the blob request based on the blob request.
let blobs_request = Request::BlobsByRange(BlobsByRangeRequest {
@@ -288,7 +288,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
/// Response for a request that is only for blocks.
pub fn range_sync_block_response(
pub fn range_sync_block_only_response(
&mut self,
request_id: Id,
is_stream_terminator: bool,

View File

@@ -685,7 +685,7 @@ mod tests {
range.add_peer(&mut rig.cx, local_info, peer1, head_info);
let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => {
(rig.cx.range_sync_block_response(id, true).unwrap(), id)
(rig.cx.range_sync_block_only_response(id, true).unwrap(), id)
}
other => panic!("unexpected request {:?}", other),
};
@@ -704,7 +704,7 @@ mod tests {
range.add_peer(&mut rig.cx, local_info, peer2, finalized_info);
let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 {
RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => {
(rig.cx.range_sync_block_response(id, true).unwrap(), id)
(rig.cx.range_sync_block_only_response(id, true).unwrap(), id)
}
other => panic!("unexpected request {:?}", other),
};