mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-17 10:48:28 +00:00
much work
This commit is contained in:
@@ -115,7 +115,7 @@ use tokio_stream::Stream;
|
||||
use tree_hash::TreeHash;
|
||||
use types::beacon_block_body::KzgCommitments;
|
||||
use types::beacon_state::CloneConfig;
|
||||
use types::blob_sidecar::{BlobIdentifier, BlobSidecarList, Blobs};
|
||||
use types::blob_sidecar::{BlobSidecarList, Blobs};
|
||||
use types::consts::deneb::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
|
||||
use types::consts::merge::INTERVALS_PER_SLOT;
|
||||
use types::*;
|
||||
@@ -190,7 +190,7 @@ pub enum WhenSlotSkipped {
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum AvailabilityProcessingStatus {
|
||||
MissingParts(Hash256),
|
||||
MissingParts(Slot, Hash256),
|
||||
Imported(Hash256),
|
||||
}
|
||||
|
||||
@@ -2670,8 +2670,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
AvailabilityProcessingStatus::Imported(_) => {
|
||||
// The block was imported successfully.
|
||||
}
|
||||
AvailabilityProcessingStatus::MissingParts(block_root) => {
|
||||
//TODO(sean) fail
|
||||
AvailabilityProcessingStatus::MissingParts(slot, block_root) => {
|
||||
return ChainSegmentResult::Failed {
|
||||
imported_blocks,
|
||||
error: BlockError::MissingBlockParts(slot, block_root),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2748,6 +2751,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
count_unrealized: CountUnrealized,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
|
||||
self.check_availability_and_maybe_import(
|
||||
blob.slot(),
|
||||
|chain| chain.data_availability_checker.put_gossip_blob(blob),
|
||||
count_unrealized,
|
||||
)
|
||||
@@ -2804,6 +2808,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
ExecutedBlock::AvailabilityPending(block) => {
|
||||
self.check_availability_and_maybe_import(
|
||||
block.block.slot(),
|
||||
|chain| {
|
||||
chain
|
||||
.data_availability_checker
|
||||
@@ -2907,6 +2912,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// (i.e., this function is not atomic).
|
||||
pub async fn check_availability_and_maybe_import(
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
cache_fn: impl FnOnce(Arc<Self>) -> Result<Availability<T::EthSpec>, AvailabilityCheckError>,
|
||||
count_unrealized: CountUnrealized,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
|
||||
@@ -2916,7 +2922,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self.import_available_block(block, count_unrealized).await
|
||||
}
|
||||
Availability::MissingParts(block_root) => {
|
||||
Ok(AvailabilityProcessingStatus::MissingParts(block_root))
|
||||
Ok(AvailabilityProcessingStatus::MissingParts(slot, block_root))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,6 +147,9 @@ impl<T: EthSpec> GossipVerifiedBlob<T> {
|
||||
pub fn to_blob(self) -> Arc<BlobSidecar<T>> {
|
||||
self.blob
|
||||
}
|
||||
pub fn slot(&self) -> Slot {
|
||||
self.blob.slot
|
||||
}
|
||||
}
|
||||
|
||||
pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
|
||||
|
||||
@@ -312,6 +312,7 @@ pub enum BlockError<T: EthSpec> {
|
||||
},
|
||||
BlobValidation(BlobError),
|
||||
AvailabilityCheck(AvailabilityCheckError),
|
||||
MissingBlockParts(Slot, Hash256),
|
||||
}
|
||||
|
||||
impl<T: EthSpec> From<BlobError> for BlockError<T> {
|
||||
|
||||
@@ -2,9 +2,7 @@ use crate::blob_verification::{
|
||||
verify_kzg_for_blob, verify_kzg_for_blob_list, AsBlock, BlockWrapper, GossipVerifiedBlob,
|
||||
KzgVerifiedBlob, KzgVerifiedBlobList, MaybeAvailableBlock,
|
||||
};
|
||||
use crate::block_verification::{
|
||||
AvailabilityPendingExecutedBlock, AvailableExecutedBlock, IntoExecutionPendingBlock,
|
||||
};
|
||||
use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock};
|
||||
|
||||
use kzg::Error as KzgError;
|
||||
use kzg::Kzg;
|
||||
@@ -13,15 +11,14 @@ use slot_clock::SlotClock;
|
||||
use ssz_types::{Error, FixedVector, VariableList};
|
||||
use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions;
|
||||
use std::collections::hash_map::{Entry, OccupiedEntry};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::ops::Index;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use types::beacon_block_body::KzgCommitments;
|
||||
use types::blob_sidecar::{BlobIdentifier, BlobSidecar};
|
||||
use types::consts::deneb::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
|
||||
use types::{
|
||||
BeaconBlockRef, BlobSidecarList, ChainSpec, Epoch, EthSpec, ExecPayload, FullPayload, Hash256,
|
||||
SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot,
|
||||
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -202,16 +199,20 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
block_root: Hash256,
|
||||
blobs: FixedVector<Option<Arc<BlobSidecar<T>>>, T::MaxBlobsPerBlock>,
|
||||
) -> Result<Availability<T>, AvailabilityCheckError> {
|
||||
//TODO(sean) merge with existing blobs, only kzg verify blobs we haven't yet verified
|
||||
// TODO(sean) we may duplicated kzg verification on some blobs we already have cached so we could optimize this
|
||||
|
||||
// Verify the KZG commitment.
|
||||
let kzg_verified_blobs = if let Some(kzg) = self.kzg.as_ref() {
|
||||
verify_kzg_for_blob_list(blobs, kzg)?
|
||||
let mut verified_blobs = vec![];
|
||||
if let Some(kzg) = self.kzg.as_ref() {
|
||||
for blob_opt in blobs.into_iter() {
|
||||
if let Some(blob) = blob_opt {
|
||||
verified_blobs.push(verify_kzg_for_blob(blob.clone(), kzg)?)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(AvailabilityCheckError::KzgNotInitialized);
|
||||
};
|
||||
|
||||
self.put_kzg_verified_blobs(block_root, &kzg_verified_blobs)
|
||||
self.put_kzg_verified_blobs(block_root, &verified_blobs)
|
||||
}
|
||||
|
||||
/// This first validates the KZG commitments included in the blob sidecar.
|
||||
@@ -366,7 +367,12 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
.kzg
|
||||
.as_ref()
|
||||
.ok_or(AvailabilityCheckError::KzgNotInitialized)?;
|
||||
let verified_blobs = verify_kzg_for_blob_list(blob_list, kzg)?;
|
||||
let filtered_blobs = blob_list
|
||||
.to_vec()
|
||||
.into_iter()
|
||||
.filter_map(|blob| blob)
|
||||
.collect();
|
||||
let verified_blobs = verify_kzg_for_blob_list(filtered_blobs, kzg)?;
|
||||
|
||||
Ok(MaybeAvailableBlock::Available(
|
||||
self.check_availability_with_blobs(block, verified_blobs)?,
|
||||
@@ -375,40 +381,6 @@ impl<T: EthSpec, S: SlotClock> DataAvailabilityChecker<T, S> {
|
||||
}
|
||||
}
|
||||
|
||||
/// For a given block wrapper, find the missing blobs. Useful for parent unknown blocks.
|
||||
/// Because these don't otherwise hit the data availability caches.
|
||||
pub fn get_missing_blob_ids(
|
||||
&self,
|
||||
block: BlockWrapper<T>,
|
||||
block_root: Option<Hash256>,
|
||||
) -> Result<Vec<BlobIdentifier>, AvailabilityCheckError> {
|
||||
let (block, blobs) = block.deconstruct();
|
||||
let maybe_available = self.check_availability_without_blobs(block)?;
|
||||
let blob_ids = match &maybe_available {
|
||||
MaybeAvailableBlock::Available(_) => {
|
||||
vec![]
|
||||
}
|
||||
MaybeAvailableBlock::AvailabilityPending(pending_block) => {
|
||||
if let Some(blobs) = blobs {
|
||||
pending_block.get_filtered_blob_ids(block_root, |index_usize, block_root| {
|
||||
let index = index_usize as u64;
|
||||
let blob_in_wrapper = blobs
|
||||
.get(index_usize)
|
||||
.map(|blob| blob.index == index)
|
||||
.unwrap_or(false);
|
||||
let blob_in_cache = self
|
||||
.get_blob(&BlobIdentifier { block_root, index })
|
||||
.is_some();
|
||||
!blob_in_wrapper && !blob_in_cache
|
||||
})
|
||||
} else {
|
||||
pending_block.get_all_blob_ids(block_root)
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(blob_ids)
|
||||
}
|
||||
|
||||
/// Checks if a block is available, returning an error if the block is not immediately available.
|
||||
/// Does not access the gossip cache.
|
||||
pub fn try_check_availability(
|
||||
@@ -582,6 +554,9 @@ pub struct AvailabilityPendingBlock<E: EthSpec> {
|
||||
}
|
||||
|
||||
impl<E: EthSpec> AvailabilityPendingBlock<E> {
|
||||
pub fn slot(&self) -> Slot {
|
||||
self.block.slot()
|
||||
}
|
||||
pub fn num_blobs_expected(&self) -> usize {
|
||||
self.kzg_commitments()
|
||||
.map_or(0, |commitments| commitments.len())
|
||||
@@ -712,7 +687,12 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
|
||||
fn into_block_wrapper(self) -> BlockWrapper<E> {
|
||||
let (block, blobs_opt) = self.deconstruct();
|
||||
if let Some(blobs) = blobs_opt {
|
||||
BlockWrapper::BlockAndBlobs(block, blobs.to_vec())
|
||||
let blobs_vec = blobs
|
||||
.to_vec()
|
||||
.into_iter()
|
||||
.map(Option::Some)
|
||||
.collect::<Vec<_>>();
|
||||
BlockWrapper::BlockAndBlobs(block, FixedVector::from(blobs_vec))
|
||||
} else {
|
||||
BlockWrapper::Block(block)
|
||||
}
|
||||
|
||||
@@ -136,7 +136,7 @@ pub async fn publish_block<T: BeaconChainTypes>(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(AvailabilityProcessingStatus::MissingParts(block_root)) => {
|
||||
Ok(AvailabilityProcessingStatus::MissingParts(_, block_root)) => {
|
||||
let msg = format!("Missing parts of block with root {:?}", block_root);
|
||||
error!(
|
||||
log,
|
||||
|
||||
@@ -66,7 +66,7 @@ use task_executor::TaskExecutor;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, BlobSidecar, Hash256, LightClientFinalityUpdate,
|
||||
Attestation, AttesterSlashing, BlobSidecar, EthSpec, Hash256, LightClientFinalityUpdate,
|
||||
LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
|
||||
SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit,
|
||||
SubnetId, SyncCommitteeMessage, SyncSubnetId,
|
||||
@@ -632,7 +632,10 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
|
||||
|
||||
pub fn rpc_blobs(
|
||||
block_root: Hash256,
|
||||
blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
|
||||
blobs: FixedVector<
|
||||
Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
|
||||
>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) -> Self {
|
||||
@@ -948,7 +951,10 @@ pub enum Work<T: BeaconChainTypes> {
|
||||
},
|
||||
RpcBlobs {
|
||||
block_root: Hash256,
|
||||
blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
|
||||
blobs: FixedVector<
|
||||
Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
|
||||
>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
},
|
||||
|
||||
@@ -27,7 +27,6 @@ use slot_clock::SlotClock;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::time::Duration;
|
||||
use strum::AsRefStr;
|
||||
@@ -36,8 +35,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use tokio::time::error::Error as TimeError;
|
||||
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
|
||||
use types::{
|
||||
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof,
|
||||
SignedBeaconBlock, SubnetId,
|
||||
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId,
|
||||
};
|
||||
|
||||
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
|
||||
|
||||
@@ -680,21 +680,19 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
// This value is not used presently, but it might come in handy for debugging.
|
||||
_seen_duration: Duration,
|
||||
) {
|
||||
// TODO
|
||||
match self
|
||||
.chain
|
||||
.process_blob(verified_blob, CountUnrealized::True)
|
||||
.await
|
||||
{
|
||||
Ok(AvailabilityProcessingStatus::Imported(_hash)) => {
|
||||
todo!()
|
||||
// add to metrics
|
||||
// logging
|
||||
//TODO(sean) add metrics and logging
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
}
|
||||
Ok(AvailabilityProcessingStatus::MissingParts(block_hash)) => {
|
||||
Ok(AvailabilityProcessingStatus::MissingParts(slot, block_hash)) => {
|
||||
self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob(
|
||||
peer_id, block_hash,
|
||||
)); //TODO(sean) update
|
||||
slot, peer_id, block_hash,
|
||||
));
|
||||
}
|
||||
Err(_err) => {
|
||||
// handle errors
|
||||
@@ -991,6 +989,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
);
|
||||
None
|
||||
}
|
||||
_ => todo!(), //TODO(sean)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1045,12 +1044,10 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
}
|
||||
Ok(AvailabilityProcessingStatus::MissingParts(block_root)) => {
|
||||
Ok(AvailabilityProcessingStatus::MissingParts(slot, block_root)) => {
|
||||
// make rpc request for blob
|
||||
self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob(
|
||||
peer_id,
|
||||
block_root,
|
||||
Duration::from_secs(0), //TODO(sean) update
|
||||
slot, peer_id, block_root,
|
||||
));
|
||||
}
|
||||
Err(BlockError::ParentUnknown(block)) => {
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::beacon_processor::work_reprocessing_queue::QueuedRpcBlock;
|
||||
use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE;
|
||||
use crate::beacon_processor::DuplicateCache;
|
||||
use crate::metrics;
|
||||
use crate::sync::manager::{BlockProcessType, SyncMessage};
|
||||
use crate::sync::manager::{BlockProcessType, ResponseType, SyncMessage};
|
||||
use crate::sync::{BatchProcessResult, ChainId};
|
||||
use beacon_chain::blob_verification::AsBlock;
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
@@ -144,7 +144,10 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
pub async fn process_rpc_blobs(
|
||||
self,
|
||||
block_root: Hash256,
|
||||
blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
|
||||
blobs: FixedVector<
|
||||
Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
|
||||
>,
|
||||
seen_timestamp: Duration,
|
||||
process_type: BlockProcessType,
|
||||
) {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, MaybeAvailableBlock};
|
||||
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
|
||||
use beacon_chain::data_availability_checker::DataAvailabilityChecker;
|
||||
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
|
||||
use lighthouse_network::rpc::RPCError;
|
||||
@@ -7,11 +7,11 @@ use lru_cache::LRUTimeCache;
|
||||
use slog::{debug, error, trace, warn, Logger};
|
||||
use smallvec::SmallVec;
|
||||
use ssz_types::FixedVector;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use store::Hash256;
|
||||
use types::{BlobSidecar, SignedBeaconBlock, Slot};
|
||||
use types::{BlobSidecar, EthSpec, SignedBeaconBlock, Slot};
|
||||
|
||||
use self::parent_lookup::{LookupDownloadStatus, PARENT_FAIL_TOLERANCE};
|
||||
use self::parent_lookup::{ParentLookup, ParentVerifyError};
|
||||
@@ -259,7 +259,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
let triggered_parent_request = self
|
||||
.parent_lookups
|
||||
.iter()
|
||||
.any(|lookup| lookup.chain_hash() == block_root);
|
||||
.any(|lookup| lookup.chain_hash() == root);
|
||||
|
||||
if triggered_parent_request {
|
||||
// The lookup status here is irrelevant because we wait until the parent chain
|
||||
@@ -269,7 +269,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
// This is the correct block, send it for processing
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
block_root,
|
||||
root,
|
||||
BlockWrapper::Block(block),
|
||||
seen_timestamp,
|
||||
BlockProcessType::SingleBlock { id },
|
||||
@@ -290,7 +290,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
|
||||
|
||||
debug!(self.log, "Single block lookup failed";
|
||||
"peer_id" => %peer_id, "error" => msg, "block_root" => %request.requested_block_root);
|
||||
"peer_id" => %peer_id, "error" => msg, "block_root" => %request_ref.requested_block_root);
|
||||
// try the request again if possible
|
||||
if let Ok((peer_id, request)) = request_ref.request_block() {
|
||||
if let Ok(id) = cx.single_block_lookup_request(peer_id, request) {
|
||||
@@ -337,7 +337,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
if triggered_parent_request {
|
||||
// The lookup status here is irrelevant because we wait until the parent chain
|
||||
// is complete before processing the block.
|
||||
let _ = request_ref.add_block(root, block)?;
|
||||
let _ = request_ref.add_blobs(block_root, blobs)?;
|
||||
} else {
|
||||
// These are the correct blobs, send them for processing
|
||||
if self
|
||||
@@ -362,7 +362,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);
|
||||
|
||||
debug!(self.log, "Single block lookup failed";
|
||||
"peer_id" => %peer_id, "error" => msg, "block_root" => %request.requested_block_root);
|
||||
"peer_id" => %peer_id, "error" => msg, "block_root" => %request_ref.requested_block_root);
|
||||
// try the request again if possible
|
||||
if let Ok((peer_id, request)) = request_ref.request_blobs() {
|
||||
if let Ok(id) = cx.single_blobs_lookup_request(peer_id, request) {
|
||||
@@ -566,10 +566,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
self.search_block(block_root, peer_id, cx);
|
||||
self.parent_lookups.push(parent_lookup)
|
||||
}
|
||||
LookupDownloadStatus::Err(e) => {
|
||||
warn!(self.log, "Peer sent invalid response to parent request.";
|
||||
"peer_id" => %peer_id, "reason" => %e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
@@ -748,7 +744,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
self.single_block_lookups.retain_mut(|(block_id, blob_id, req)|{
|
||||
if &Some(id) == block_id {
|
||||
req.block_request_state.register_failure_downloading();
|
||||
trace!(self.log, "Single block lookup failed"; "block" => %request.requested_block_root);
|
||||
trace!(self.log, "Single block lookup failed"; "block" => %req.requested_block_root);
|
||||
match req.request_block() {
|
||||
Ok(Some((peer_id, block_request))) => {
|
||||
if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) {
|
||||
@@ -769,7 +765,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
if &Some(id) == blob_id {
|
||||
req.blob_request_state.register_failure_downloading();
|
||||
trace!(self.log, "Single blob lookup failed"; "block" => %request.requested_block_root);
|
||||
trace!(self.log, "Single blob lookup failed"; "block" => %req.requested_block_root);
|
||||
match req.request_blobs() {
|
||||
Ok(Some((peer_id, blob_request))) => {
|
||||
if let Ok(request_id) = cx.single_blobs_lookup_request(peer_id, blob_request) {
|
||||
@@ -851,7 +847,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
trace!(self.log, "Single block processing succeeded"; "block" => %root);
|
||||
true
|
||||
}
|
||||
AvailabilityProcessingStatus::MissingParts(block_root) => {
|
||||
AvailabilityProcessingStatus::MissingParts(_, block_root) => {
|
||||
self.search_block(block_root, peer_id, cx);
|
||||
false
|
||||
}
|
||||
@@ -907,7 +903,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
ResponseType::Block => {
|
||||
req.block_request_state.register_failure_processing();
|
||||
match req.request_block() {
|
||||
Ok(Some((peer_id, requeest))) => {
|
||||
Ok(Some((peer_id, request))) => {
|
||||
if let Ok(request_id) =
|
||||
cx.single_block_lookup_request(peer_id, request)
|
||||
{
|
||||
@@ -965,7 +961,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
.parent_lookups
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(index, _)| lookup.chain_hash() == chain_hash)
|
||||
.find(|(_, lookup)| lookup.chain_hash() == chain_hash)
|
||||
.map(|(index, _)| index);
|
||||
|
||||
let Some(mut parent_lookup) = index.map(|index|self.parent_lookups.remove(index)) else {
|
||||
@@ -993,7 +989,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
AvailabilityProcessingStatus::Imported(hash) => {
|
||||
trace!(self.log, "Parent block processing succeeded"; &parent_lookup)
|
||||
}
|
||||
AvailabilityProcessingStatus::MissingParts(block_root) => {
|
||||
AvailabilityProcessingStatus::MissingParts(_, block_root) => {
|
||||
trace!(self.log, "Parent missing parts, triggering single block lookup "; &parent_lookup)
|
||||
}
|
||||
},
|
||||
@@ -1012,6 +1008,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
|
||||
match result {
|
||||
BlockPartProcessingResult::Ok(AvailabilityProcessingStatus::MissingParts(
|
||||
_,
|
||||
block_root,
|
||||
)) => {
|
||||
self.search_block(block_root, peer_id, cx);
|
||||
@@ -1124,14 +1121,42 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result);
|
||||
match result {
|
||||
BatchProcessResult::Success { .. } => {
|
||||
//TODO(sean) find single blob and block lookups and send for processing
|
||||
if let Some((index, (_, _, req))) = self
|
||||
.single_block_lookups
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(index, (_, _, req))| req.requested_block_root == chain_hash)
|
||||
{
|
||||
self.single_block_lookups
|
||||
.get_mut(index)
|
||||
.and_then(|(_, _, lookup)| lookup.get_downloaded_block())
|
||||
.map(|block_wrapper| {
|
||||
// This is the correct block, send it for processing
|
||||
if self
|
||||
.send_block_for_processing(
|
||||
chain_hash,
|
||||
block_wrapper,
|
||||
Duration::from_secs(0), //TODO(sean) pipe this through
|
||||
BlockProcessType::SingleBlock { id },
|
||||
cx,
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
// Remove to avoid inconsistencies
|
||||
self.single_block_lookups.remove(index);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
BatchProcessResult::FaultyFailure {
|
||||
imported_blocks: _,
|
||||
penalty,
|
||||
} => {
|
||||
//TODO(sean) improve peer scoring to block or blob granularity
|
||||
self.failed_chains.insert(chain_hash);
|
||||
for peer_id in request.used_peers {
|
||||
let mut all_peers = request.blob_request_state.used_peers.clone();
|
||||
all_peers.extend(request.blob_request_state.used_peers);
|
||||
for peer_id in all_peers {
|
||||
cx.report_peer(peer_id, penalty, "parent_chain_failure")
|
||||
}
|
||||
}
|
||||
@@ -1181,14 +1206,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
fn send_blobs_for_processing(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
|
||||
seen_timestamp: Duration,
|
||||
id: BlockProcessType,
|
||||
blobs: FixedVector<
|
||||
Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
|
||||
>,
|
||||
duration: Duration,
|
||||
process_type: BlockProcessType,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<(), ()> {
|
||||
match cx.processor_channel_if_enabled() {
|
||||
Some(beacon_processor_send) => {
|
||||
trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process" => ?process_type);
|
||||
trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type);
|
||||
let event = WorkEvent::rpc_blobs(block_root, blobs, duration, process_type);
|
||||
if let Err(e) = beacon_processor_send.try_send(event) {
|
||||
error!(
|
||||
|
||||
@@ -192,7 +192,10 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
||||
pub fn add_blobs(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
|
||||
blobs: FixedVector<
|
||||
Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
|
||||
>,
|
||||
) -> Result<LookupDownloadStatus<T::EthSpec>, ParentVerifyError> {
|
||||
self.current_parent_blob_request_id = None;
|
||||
self.current_parent_request
|
||||
@@ -302,7 +305,10 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
|
||||
) -> Result<
|
||||
Option<(
|
||||
Hash256,
|
||||
FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
|
||||
FixedVector<
|
||||
Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
|
||||
>,
|
||||
)>,
|
||||
ParentVerifyError,
|
||||
> {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use crate::sync::block_lookups::parent_lookup::LookupDownloadStatus;
|
||||
use crate::sync::block_lookups::RootBlockTuple;
|
||||
use beacon_chain::blob_verification::AsBlock;
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker};
|
||||
use beacon_chain::{get_block_root, BeaconChainTypes};
|
||||
@@ -10,17 +9,19 @@ use rand::seq::IteratorRandom;
|
||||
use ssz_types::{FixedVector, VariableList};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use store::{EthSpec, Hash256};
|
||||
use store::Hash256;
|
||||
use strum::IntoStaticStr;
|
||||
use types::blob_sidecar::BlobIdentifier;
|
||||
use types::{BlobSidecar, SignedBeaconBlock};
|
||||
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
|
||||
|
||||
pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> {
|
||||
pub requested_block_root: Hash256,
|
||||
pub requested_ids: Vec<BlobIdentifier>,
|
||||
pub downloaded_blobs:
|
||||
FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
|
||||
pub downloaded_block: Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
pub downloaded_blobs: FixedVector<
|
||||
Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
|
||||
>,
|
||||
pub downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
pub block_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
|
||||
pub blob_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
|
||||
pub da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>,
|
||||
@@ -88,10 +89,25 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_downloaded_block(&mut self) -> Option<BlockWrapper<T::EthSpec>> {
|
||||
if self.requested_ids.is_empty() {
|
||||
if let Some(block) = self.downloaded_block.take() {
|
||||
return Some(BlockWrapper::BlockAndBlobs(
|
||||
block,
|
||||
self.downloaded_blobs.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn add_blobs(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
blobs: FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>,
|
||||
blobs: FixedVector<
|
||||
Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
|
||||
>,
|
||||
) -> Result<LookupDownloadStatus<T::EthSpec>, LookupVerifyError> {
|
||||
for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() {
|
||||
if let Some(Some(downloaded_blob)) = blobs.get(index) {
|
||||
@@ -109,7 +125,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
|
||||
Err(e) => Err(LookupVerifyError::AvailabilityCheck(e)),
|
||||
}
|
||||
} else {
|
||||
Ok(LookupDownloadStatus::SearchBlock(block_hash))
|
||||
Ok(LookupDownloadStatus::SearchBlock(block_root))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,12 +134,8 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
) -> Result<LookupDownloadStatus<T::EthSpec>, LookupVerifyError> {
|
||||
for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() {
|
||||
if let Some(Some(downloaded_blob)) = blobs.get(index) {
|
||||
//TODO(sean) should we log a warn if there is already a downloaded blob?
|
||||
*blob_opt = Some(downloaded_blob);
|
||||
}
|
||||
}
|
||||
//TODO(sean) check for existing block?
|
||||
self.downloaded_block = Some(block);
|
||||
|
||||
match self
|
||||
.da_checker
|
||||
@@ -204,27 +216,32 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
|
||||
&mut self,
|
||||
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
) -> Result<
|
||||
Option<FixedVector<Option<Arc<BlobSidecar<T::EthSpec>>>, T::EthSpec::MaxBlobsPerBlock>>,
|
||||
BlobVerifyError,
|
||||
Option<
|
||||
FixedVector<
|
||||
Option<Arc<BlobSidecar<T::EthSpec>>>,
|
||||
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
|
||||
>,
|
||||
>,
|
||||
LookupVerifyError,
|
||||
> {
|
||||
match self.block_request_state.state {
|
||||
State::AwaitingDownload => {
|
||||
self.blob_request_state.register_failure_downloading();
|
||||
Err(BlobVerifyError::ExtraBlobsReturned)
|
||||
Err(LookupVerifyError::ExtraBlobsReturned)
|
||||
}
|
||||
State::Downloading { peer_id } => match blob {
|
||||
Some(blob) => {
|
||||
let received_id = blob.id();
|
||||
if !self.requested_ids.contains(&received_id) {
|
||||
self.blob_request_state.register_failure_downloading();
|
||||
Err(BlobVerifyError::UnrequestedBlobId)
|
||||
Err(LookupVerifyError::UnrequestedBlobId)
|
||||
} else {
|
||||
// State should remain downloading until we receive the stream terminator.
|
||||
self.requested_ids.retain(|id| id != received_id);
|
||||
if let Some(blob_opt) = self.downloaded_blobs.get_mut(blob.index) {
|
||||
*blob_opt = Some(blob);
|
||||
} else {
|
||||
return Err(BlobVerifyError::InvalidIndex(blob.index));
|
||||
return Err(LookupVerifyError::InvalidIndex(blob.index));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -233,11 +250,11 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
|
||||
Ok(Some(self.downloaded_blobs.clone()))
|
||||
}
|
||||
},
|
||||
State::Processing { peer_id: _ } => match block {
|
||||
State::Processing { peer_id: _ } => match blob {
|
||||
Some(_) => {
|
||||
// We sent the blob for processing and received an extra blob.
|
||||
self.blob_request_state.register_failure_downloading();
|
||||
Err(BlobVerifyError::ExtraBlobsReturned)
|
||||
Err(LookupVerifyError::ExtraBlobsReturned)
|
||||
}
|
||||
None => {
|
||||
// This is simply the stream termination and we are already processing the
|
||||
@@ -304,7 +321,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
|
||||
.choose(&mut rand::thread_rng())
|
||||
{
|
||||
let request = BlobsByRootRequest {
|
||||
blob_ids: VariableList::from(self.requested_ids),
|
||||
blob_ids: VariableList::from(self.requested_ids.clone()),
|
||||
};
|
||||
self.blob_request_state.state = State::Downloading { peer_id };
|
||||
self.blob_request_state.used_peers.insert(peer_id);
|
||||
@@ -313,15 +330,6 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
|
||||
Err(LookupRequestError::NoPeers)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: only add peers if they have *all* blob ids in the request, this could probably be improved.
|
||||
pub fn add_peer_if_useful(&mut self, blob_id: &[BlobIdentifier], peer_id: &PeerId) -> bool {
|
||||
let is_useful = self.requested_ids.contains(blob_id);
|
||||
if is_useful {
|
||||
self.block_request_state.add_peer(peer_id);
|
||||
}
|
||||
is_useful
|
||||
}
|
||||
}
|
||||
|
||||
impl<const MAX_ATTEMPTS: u8> SingleLookupRequestState<MAX_ATTEMPTS> {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use ssz_types::FixedVector;
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
|
||||
|
||||
@@ -55,7 +56,18 @@ impl<T: EthSpec> BlocksAndBlobsRequestInfo<T> {
|
||||
if blob_list.is_empty() {
|
||||
responses.push(BlockWrapper::Block(block))
|
||||
} else {
|
||||
responses.push(BlockWrapper::BlockAndBlobs(block, blob_list))
|
||||
let mut blobs_fixed = Vec::with_capacity(T::max_blobs_per_block());
|
||||
for blob in blob_list {
|
||||
let blob_index = blob.index as usize;
|
||||
if blob_index >= T::max_blobs_per_block() {
|
||||
return Err(format!("Invalid blob index {blob_index:?}").as_str());
|
||||
}
|
||||
blobs_fixed.insert(blob_index, Some(blob));
|
||||
}
|
||||
responses.push(BlockWrapper::BlockAndBlobs(
|
||||
block,
|
||||
FixedVector::from(blobs_fixed),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -41,10 +41,10 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
|
||||
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
|
||||
use crate::service::NetworkMessage;
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::block_lookups::ResponseType;
|
||||
pub use crate::sync::block_lookups::ResponseType;
|
||||
use crate::sync::range_sync::ByRangeRequestType;
|
||||
use beacon_chain::blob_verification::AsBlock;
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock};
|
||||
use beacon_chain::{
|
||||
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState,
|
||||
};
|
||||
@@ -58,12 +58,10 @@ use slog::{crit, debug, error, info, trace, warn, Logger};
|
||||
use slot_clock::SlotClock;
|
||||
use std::boxed::Box;
|
||||
use std::ops::Sub;
|
||||
use std::sync::mpsc::TryRecvError;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::sleep;
|
||||
use types::blob_sidecar::BlobIdentifier;
|
||||
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
|
||||
|
||||
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
|
||||
@@ -662,12 +660,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}
|
||||
};
|
||||
|
||||
if block.slot() == self.chain.slot_clock.now() {
|
||||
if block.slot() == slot {
|
||||
if let Err(e) = self
|
||||
.delayed_lookups
|
||||
.send(SyncMessage::UnknownBlock(peer_id, block, block_root))
|
||||
.try_send(SyncMessage::UnknownBlock(peer_id, block, block_root))
|
||||
{
|
||||
warn!(self.log, "Delayed lookups receiver dropped for block"; "block_root" => block_hash);
|
||||
warn!(self.log, "Delayed lookups dropped for block"; "block_root" => ?block_root);
|
||||
}
|
||||
} else {
|
||||
self.block_lookups.search_current_unknown_parent(
|
||||
@@ -709,14 +707,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
};
|
||||
|
||||
if slot == current_slot {
|
||||
if let Err(e) =
|
||||
self.delayed_lookups
|
||||
.send(SyncMessage::UnknownBlockHashFromAttestation(
|
||||
peer_id, block_hash,
|
||||
))
|
||||
{
|
||||
warn!(self.log, "Delayed lookups receiver dropped for block referenced by a blob";
|
||||
"block_root" => block_hash);
|
||||
if let Err(e) = self.delayed_lookups.try_send(
|
||||
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash),
|
||||
) {
|
||||
warn!(self.log, "Delayed lookup dropped for block referenced by a blob";
|
||||
"block_root" => ?block_hash);
|
||||
}
|
||||
} else {
|
||||
self.block_lookups
|
||||
|
||||
Reference in New Issue
Block a user