This commit is contained in:
realbigsean
2022-12-30 11:00:14 -05:00
parent 222a514506
commit d8f7277beb
18 changed files with 109 additions and 302 deletions

View File

@@ -115,7 +115,8 @@ const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024;
/// before we start dropping them.
const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
//FIXME(sean) verify
/// The maximum number of queued `SignedBeaconBlockAndBlobsSidecar` objects received on gossip that
/// will be stored before we start dropping them.
const MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN: usize = 1_024;
/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
@@ -1186,7 +1187,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
self.spawn_worker(item, toolbox);
//FIXME(sean)
} else if let Some(item) = gossip_block_and_blobs_sidecar_queue.pop() {
self.spawn_worker(item, toolbox);
// Check the aggregates, *then* the unaggregates since we assume that
@@ -1675,23 +1675,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
/*
* Verification for blobs sidecars received on gossip.
*/
Work::GossipBlockAndBlobsSidecar {
message_id,
peer_id,
peer_client,
block_and_blobs,
seen_timestamp,
} => task_spawner.spawn_async(async move {
worker
.process_gossip_block_and_blobs_sidecar(
message_id,
peer_id,
peer_client,
block_and_blobs,
seen_timestamp,
)
.await
}),
Work::GossipBlockAndBlobsSidecar { .. } => {
warn!(self.log, "Unexpected block and blobs on gossip")
}
/*
* Import for blocks that we received earlier than their intended slot.
*/
@@ -1892,19 +1878,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
request,
)
}),
Work::BlobsByRangeRequest {
peer_id,
request_id,
request,
} => task_spawner.spawn_blocking_with_manual_send_idle(move |send_idle_on_drop| {
worker.handle_blobs_by_range_request(
sub_executor,
send_idle_on_drop,
peer_id,
request_id,
request,
)
}),
Work::BlobsByRangeRequest { .. } => {
warn!(self.log.clone(), "Unexpected BlobsByRange Request")
}
/*
* Processing of lightclient bootstrap requests from other peers.
*/

View File

@@ -11,10 +11,7 @@ use beacon_chain::{
BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError,
GossipVerifiedBlock, NotifyExecutionLayer,
};
use lighthouse_network::{
Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource,
SignedBeaconBlockAndBlobsSidecar,
};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock;
use ssz::Encode;
@@ -699,19 +696,6 @@ impl<T: BeaconChainTypes> Worker<T> {
}
}
#[allow(clippy::too_many_arguments)]
pub async fn process_gossip_block_and_blobs_sidecar(
self,
_message_id: MessageId,
_peer_id: PeerId,
_peer_client: Client,
_block_and_blob: Arc<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
_seen_timestamp: Duration,
) {
//FIXME
unimplemented!()
}
/// Process the beacon block received from the gossip network and
/// if it passes gossip propagation criteria, tell the network thread to forward it.
///

View File

@@ -4,7 +4,6 @@ use crate::status::ToStatusMessage;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped};
use itertools::process_results;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MAX_REQUEST_BLOBS_SIDECARS};
use lighthouse_network::rpc::StatusMessage;
use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
@@ -455,152 +454,4 @@ impl<T: BeaconChainTypes> Worker<T> {
"load_blocks_by_range_blocks",
);
}
/// Handle a `BlobsByRange` request from the peer.
pub fn handle_blobs_by_range_request(
self,
_executor: TaskExecutor,
_send_on_drop: SendOnDrop,
peer_id: PeerId,
_request_id: PeerRequestId,
mut req: BlobsByRangeRequest,
) {
debug!(self.log, "Received BlobsByRange Request";
"peer_id" => %peer_id,
"count" => req.count,
"start_slot" => req.start_slot,
);
// Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOBS_SIDECARS {
req.count = MAX_REQUEST_BLOBS_SIDECARS;
}
//FIXME(sean) create the blobs iter
// let forwards_block_root_iter = match self
// .chain
// .forwards_iter_block_roots(Slot::from(req.start_slot))
// {
// Ok(iter) => iter,
// Err(BeaconChainError::HistoricalBlockError(
// HistoricalBlockError::BlockOutOfRange {
// slot,
// oldest_block_slot,
// },
// )) => {
// debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot);
// return self.send_error_response(
// peer_id,
// RPCResponseErrorCode::ResourceUnavailable,
// "Backfilling".into(),
// request_id,
// );
// }
// Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e),
// };
//
// // Pick out the required blocks, ignoring skip-slots.
// let mut last_block_root = None;
// let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
// iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// // map skip slots to None
// .map(|(root, _)| {
// let result = if Some(root) == last_block_root {
// None
// } else {
// Some(root)
// };
// last_block_root = Some(root);
// result
// })
// .collect::<Vec<Option<Hash256>>>()
// });
//
// let block_roots = match maybe_block_roots {
// Ok(block_roots) => block_roots,
// Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e),
// };
//
// // remove all skip slots
// let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
//
// // Fetching blocks is async because it may have to hit the execution layer for payloads.
// executor.spawn(
// async move {
// let mut blocks_sent = 0;
// let mut send_response = true;
//
// for root in block_roots {
// match self.chain.store.get_blobs(&root) {
// Ok(Some(blob)) => {
// blocks_sent += 1;
// self.send_network_message(NetworkMessage::SendResponse {
// peer_id,
// response: Response::BlobsByRange(Some(Arc::new(VariableList::new(vec![blob.message]).unwrap()))),
// id: request_id,
// });
// }
// Ok(None) => {
// error!(
// self.log,
// "Blob in the chain is not in the store";
// "request_root" => ?root
// );
// break;
// }
// Err(e) => {
// error!(
// self.log,
// "Error fetching block for peer";
// "block_root" => ?root,
// "error" => ?e
// );
// break;
// }
// }
// }
//
// let current_slot = self
// .chain
// .slot()
// .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
//
// if blocks_sent < (req.count as usize) {
// debug!(
// self.log,
// "BlocksByRange Response processed";
// "peer" => %peer_id,
// "msg" => "Failed to return all requested blocks",
// "start_slot" => req.start_slot,
// "current_slot" => current_slot,
// "requested" => req.count,
// "returned" => blocks_sent
// );
// } else {
// debug!(
// self.log,
// "BlocksByRange Response processed";
// "peer" => %peer_id,
// "start_slot" => req.start_slot,
// "current_slot" => current_slot,
// "requested" => req.count,
// "returned" => blocks_sent
// );
// }
//
// if send_response {
// // send the stream terminator
// self.send_network_message(NetworkMessage::SendResponse {
// peer_id,
// response: Response::BlobsByRange(None),
// id: request_id,
// });
// }
//
// drop(send_on_drop);
// },
// "load_blocks_by_range_blocks",
// );
}
}

View File

@@ -47,7 +47,7 @@ use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
use lighthouse_network::{PeerAction, PeerId};
use slog::{crit, debug, error, info, trace, Logger};
use slog::{crit, debug, error, info, trace, warn, Logger};
use std::boxed::Box;
use std::ops::Sub;
use std::sync::Arc;
@@ -592,8 +592,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.block_lookups
.parent_chain_processed(chain_hash, result, &mut self.network),
},
//FIXME(sean)
SyncMessage::RpcBlob { .. } => todo!(),
SyncMessage::RpcBlob { .. } => {
warn!(self.log, "Unexpected blob message received");
}
}
}