some blob reprocessing work

This commit is contained in:
realbigsean
2023-03-28 18:29:56 -04:00
parent deec9c51ba
commit 8d80200bc4
4 changed files with 115 additions and 20 deletions

View File

@@ -692,12 +692,17 @@ impl<T: BeaconChainTypes> Worker<T> {
// logging
}
Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => self
.send_sync_message(SyncMessage::UnknownBlobHash {
.send_sync_message(SyncMessage::MissingBlobs {
peer_id,
pending_blobs,
search_delay: Duration::from_secs(0), //TODO(sean) update
}),
Ok(AvailabilityProcessingStatus::PendingBlock(block_hash)) => {
self.send_sync_message(SyncMessage::UnknownBlockHash(peer_id, block_hash));
self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob(
peer_id,
block_hash,
Duration::from_secs(0),
)); //TODO(sean) update
}
Err(_err) => {
// handle errors
@@ -1061,9 +1066,10 @@ impl<T: BeaconChainTypes> Worker<T> {
}
Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => {
// make rpc request for blob
self.send_sync_message(SyncMessage::UnknownBlobHash {
self.send_sync_message(SyncMessage::MissingBlobs {
peer_id,
pending_blobs,
search_delay: Duration::from_secs(0), //TODO(sean) update
});
}
Err(BlockError::AvailabilityCheck(_)) => {
@@ -1902,7 +1908,10 @@ impl<T: BeaconChainTypes> Worker<T> {
// We don't know the block, get the sync manager to handle the block lookup, and
// send the attestation to be scheduled for re-processing.
self.sync_tx
.send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root))
.send(SyncMessage::UnknownBlockHashFromAttestation(
peer_id,
*beacon_block_root,
))
.unwrap_or_else(|_| {
warn!(
self.log,

View File

@@ -101,7 +101,8 @@ impl<T: BeaconChainTypes> Worker<T> {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
// RPC block imported, regardless of process type
//TODO(sean) handle pending availability variants
//TODO(sean) do we need to do anything here for missing blobs? or is passing the result
// along to sync enough?
if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result {
info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash);

View File

@@ -1,5 +1,6 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::thread::sleep;
use std::time::Duration;
use beacon_chain::blob_verification::AsBlock;
@@ -12,6 +13,7 @@ use lru_cache::LRUTimeCache;
use slog::{debug, error, trace, warn, Logger};
use smallvec::SmallVec;
use store::Hash256;
use types::blob_sidecar::BlobIdentifier;
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent};
use crate::metrics;
@@ -133,6 +135,37 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
pub fn search_blobs(
&mut self,
blob_ids: Vec<BlobIdentifier>,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) {
todo!()
}
pub fn search_block_delayed(
&mut self,
peer_id: PeerId,
hash: Hash256,
delay: Duration,
cx: &mut SyncNetworkContext<T>,
) {
//TODO(sean) handle delay
self.search_block(hash, peer_id, cx);
}
pub fn search_blobs_delayed(
&mut self,
peer_id: PeerId,
blob_ids: Vec<BlobIdentifier>,
delay: Duration,
cx: &mut SyncNetworkContext<T>,
) {
//TODO(sean) handle delay
self.search_blobs(blob_ids, peer_id, cx);
}
/// If a block is attempted to be processed but we do not know its parent, this function is
/// called in order to find the block's parent.
pub fn search_parent(
@@ -460,6 +493,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessResult::Ok => {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
}
BlockProcessResult::MissingBlobs(blobs) => {
todo!()
}
BlockProcessResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.
@@ -543,6 +579,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessResult::Ok => {
trace!(self.log, "Parent block processing succeeded"; &parent_lookup)
}
BlockProcessResult::MissingBlobs(blobs) => {
todo!()
}
BlockProcessResult::Err(e) => {
trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e)
}
@@ -557,6 +596,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
match result {
BlockProcessResult::MissingBlobs(blobs) => {
todo!()
}
BlockProcessResult::Err(BlockError::ParentUnknown(block)) => {
// need to keep looking for parents
// add the block back to the queue and continue the search

View File

@@ -44,7 +44,9 @@ use crate::status::ToStatusMessage;
use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState};
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState,
};
use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
use lighthouse_network::rpc::RPCError;
@@ -117,15 +119,21 @@ pub enum SyncMessage<T: EthSpec> {
/// A block with an unknown parent has been received.
UnknownBlock(PeerId, BlockWrapper<T>, Hash256),
/// A peer has sent an object that references a block that is unknown. This triggers the
/// A peer has sent an attestation that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHash(PeerId, Hash256),
UnknownBlockHashFromAttestation(PeerId, Hash256),
/// A peer has sent a blob that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash when the specified delay expires.
UnknownBlockHashFromGossipBlob(PeerId, Hash256, Duration),
/// A peer has sent us a block that we haven't received all the blobs for. This triggers
/// the manager to attempt to find the pending blobs for the given block root.
UnknownBlobHash {
/// the manager to attempt to find the pending blobs for the given block root when the specified
/// delay expires.
MissingBlobs {
peer_id: PeerId,
pending_blobs: Vec<BlobIdentifier>,
search_delay: Duration,
},
/// A peer has disconnected.
@@ -161,6 +169,7 @@ pub enum BlockProcessType {
#[derive(Debug)]
pub enum BlockProcessResult<T: EthSpec> {
Ok,
MissingBlobs(Vec<BlobIdentifier>),
Err(BlockError<T>),
Ignored,
}
@@ -597,18 +606,38 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.search_parent(block_root, block, peer_id, &mut self.network);
}
}
SyncMessage::UnknownBlockHash(peer_id, block_hash) => {
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
// If we are not synced, ignore this block.
if self.network_globals.sync_state.read().is_synced()
&& self.network_globals.peers.read().is_connected(&peer_id)
&& self.network.is_execution_engine_online()
{
if self.synced_and_connected(&peer_id) {
self.block_lookups
.search_block(block_hash, peer_id, &mut self.network);
}
}
SyncMessage::UnknownBlobHash { .. } => {
unimplemented!()
SyncMessage::UnknownBlockHashFromGossipBlob(peer_id, block_hash, delay) => {
// If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) {
self.block_lookups.search_block_delayed(
peer_id,
block_hash,
delay,
&mut self.network,
);
}
}
SyncMessage::MissingBlobs {
peer_id,
pending_blobs,
search_delay,
} => {
// If we are not synced, ignore these blobs.
if self.synced_and_connected(&peer_id) {
self.block_lookups.search_blobs_delayed(
peer_id,
pending_blobs,
search_delay,
&mut self.network,
);
}
}
SyncMessage::Disconnect(peer_id) => {
self.peer_disconnect(&peer_id);
@@ -673,6 +702,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn synced_and_connected(&mut self, peer_id: &PeerId) -> bool {
self.network_globals.sync_state.read().is_synced()
&& self.network_globals.peers.read().is_connected(&peer_id)
&& self.network.is_execution_engine_online()
}
fn handle_new_execution_engine_state(&mut self, engine_state: EngineState) {
self.network.update_execution_engine_state(engine_state);
@@ -923,10 +958,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
impl<IgnoredOkVal, T: EthSpec> From<Result<IgnoredOkVal, BlockError<T>>> for BlockProcessResult<T> {
fn from(result: Result<IgnoredOkVal, BlockError<T>>) -> Self {
impl<T: EthSpec> From<Result<AvailabilityProcessingStatus, BlockError<T>>>
for BlockProcessResult<T>
{
fn from(result: Result<AvailabilityProcessingStatus, BlockError<T>>) -> Self {
match result {
Ok(_) => BlockProcessResult::Ok,
Ok(AvailabilityProcessingStatus::Imported(_)) => BlockProcessResult::Ok,
Ok(AvailabilityProcessingStatus::PendingBlock(_)) => {
todo!() // doesn't make sense
}
Ok(AvailabilityProcessingStatus::PendingBlobs(blobs)) => {
BlockProcessResult::MissingBlobs(blobs)
}
Err(e) => e.into(),
}
}