Merge branch 'unstable' into deneb-merge-from-unstable-20230627

# Conflicts:
#	beacon_node/beacon_chain/src/beacon_chain.rs
#	beacon_node/beacon_chain/src/block_verification.rs
#	beacon_node/beacon_chain/src/lib.rs
#	beacon_node/beacon_chain/src/test_utils.rs
#	beacon_node/beacon_chain/tests/block_verification.rs
#	beacon_node/beacon_chain/tests/store_tests.rs
#	beacon_node/beacon_chain/tests/tests.rs
#	beacon_node/http_api/src/publish_blocks.rs
#	beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs
#	beacon_node/lighthouse_network/src/rpc/methods.rs
#	beacon_node/lighthouse_network/src/rpc/outbound.rs
#	beacon_node/lighthouse_network/src/rpc/protocol.rs
#	beacon_node/lighthouse_network/src/service/api_types.rs
#	beacon_node/network/src/beacon_processor/worker/gossip_methods.rs
#	beacon_node/network/src/beacon_processor/worker/rpc_methods.rs
#	beacon_node/network/src/beacon_processor/worker/sync_methods.rs
#	beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
#	beacon_node/network/src/sync/network_context.rs
#	beacon_node/network/src/sync/range_sync/batch.rs
#	beacon_node/network/src/sync/range_sync/chain.rs
#	common/eth2/src/types.rs
#	consensus/fork_choice/src/fork_choice.rs
This commit is contained in:
Jimmy Chen
2023-06-27 08:38:54 +10:00
81 changed files with 1675 additions and 1243 deletions

View File

@@ -9,8 +9,8 @@ use beacon_chain::{
observed_operations::ObservationOutcome,
sync_committee_verification::{self, Error as SyncCommitteeError},
validator_monitor::get_block_delay_ms,
AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized,
ForkChoiceError, GossipVerifiedBlock, NotifyExecutionLayer,
AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError,
GossipVerifiedBlock, NotifyExecutionLayer,
};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use operation_pool::ReceivedPreCapella;
@@ -756,11 +756,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let blob_root = verified_blob.block_root();
let blob_slot = verified_blob.slot();
let blob_clone = verified_blob.clone().to_blob();
match self
.chain
.process_blob(verified_blob, CountUnrealized::True)
.await
{
match self.chain.process_blob(verified_blob).await {
Ok(AvailabilityProcessingStatus::Imported(_hash)) => {
//TODO(sean) add metrics and logging
self.chain.recompute_head_at_current_slot().await;
@@ -978,7 +974,6 @@ impl<T: BeaconChainTypes> Worker<T> {
| Err(e @ BlockError::NonLinearParentRoots)
| Err(e @ BlockError::BlockIsNotLaterThanParent { .. })
| Err(e @ BlockError::InvalidSignature)
| Err(e @ BlockError::TooManySkippedSlots { .. })
| Err(e @ BlockError::WeakSubjectivityConflict)
| Err(e @ BlockError::InconsistentFork(_))
| Err(e @ BlockError::ExecutionPayloadError(_))
@@ -1103,12 +1098,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let result = self
.chain
.process_block(
block_root,
verified_block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.process_block(block_root, verified_block, NotifyExecutionLayer::Yes)
.await;
match &result {

View File

@@ -139,10 +139,10 @@ impl<T: BeaconChainTypes> Worker<T> {
request_id: PeerRequestId,
request: BlocksByRootRequest,
) {
let requested_blocks = request.block_roots.len();
let requested_blocks = request.block_roots().len();
let mut block_stream = match self
.chain
.get_blocks_checking_early_attester_cache(request.block_roots.into(), &executor)
.get_blocks_checking_early_attester_cache(request.block_roots().to_vec(), &executor)
{
Ok(block_stream) => block_stream,
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
@@ -375,27 +375,22 @@ impl<T: BeaconChainTypes> Worker<T> {
send_on_drop: SendOnDrop,
peer_id: PeerId,
request_id: PeerRequestId,
req: BlocksByRangeRequest,
mut req: BlocksByRangeRequest,
) {
debug!(self.log, "Received BlocksByRange Request";
"peer_id" => %peer_id,
"count" => req.count,
"start_slot" => req.start_slot,
"count" => req.count(),
"start_slot" => req.start_slot(),
);
// Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOCKS {
return self.send_error_response(
peer_id,
RPCResponseErrorCode::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(),
request_id,
);
if *req.count() > MAX_REQUEST_BLOCKS {
*req.count_mut() = MAX_REQUEST_BLOCKS;
}
let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(req.start_slot))
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockError(
@@ -432,7 +427,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = req
.start_slot
.start_slot()
.checked_sub(1)
.map(|prev_slot| {
self.chain
@@ -443,18 +438,20 @@ impl<T: BeaconChainTypes> Worker<T> {
.flatten()
.flatten();
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
iter.take_while(|(_, slot)| {
slot.as_u64() < req.start_slot().saturating_add(*req.count())
})
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});
let block_roots = match maybe_block_roots {
@@ -487,8 +484,8 @@ impl<T: BeaconChainTypes> Worker<T> {
Ok(Some(block)) => {
// Due to skip slots, blocks could be out of the range, we ensure they
// are in the range before sending
if block.slot() >= req.start_slot
&& block.slot() < req.start_slot + req.count
if block.slot() >= *req.start_slot()
&& block.slot() < req.start_slot() + req.count()
{
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
@@ -572,15 +569,15 @@ impl<T: BeaconChainTypes> Worker<T> {
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
if blocks_sent < (req.count as usize) {
if blocks_sent < (*req.count() as usize) {
debug!(
self.log,
"BlocksByRange outgoing response processed";
"peer" => %peer_id,
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
"start_slot" => req.start_slot(),
"current_slot" => current_slot,
"requested" => req.count,
"requested" => req.count(),
"returned" => blocks_sent
);
} else {
@@ -588,9 +585,9 @@ impl<T: BeaconChainTypes> Worker<T> {
self.log,
"BlocksByRange outgoing response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot,
"start_slot" => req.start_slot(),
"current_slot" => current_slot,
"requested" => req.count,
"requested" => req.count(),
"returned" => blocks_sent
);
}

View File

@@ -10,12 +10,12 @@ use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock};
use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::AvailabilityProcessingStatus;
use beacon_chain::{
observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
NotifyExecutionLayer,
};
use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized};
use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
use slot_clock::SlotClock;
@@ -28,7 +28,7 @@ use types::{Epoch, Hash256};
#[derive(Clone, Debug, PartialEq)]
pub enum ChainSegmentProcessId {
/// Processing Id of a range syncing batch.
RangeBatchId(ChainId, Epoch, CountUnrealized),
RangeBatchId(ChainId, Epoch),
/// Processing ID for a backfill syncing batch.
BackSyncBatchId(Epoch),
/// Processing Id of the parent lookup of a block.
@@ -171,12 +171,7 @@ impl<T: BeaconChainTypes> Worker<T> {
let result = self
.chain
.process_block(
block_root,
block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.process_block(block_root, block, NotifyExecutionLayer::Yes)
.await;
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
@@ -233,15 +228,11 @@ impl<T: BeaconChainTypes> Worker<T> {
let result = self
.chain
.check_availability_and_maybe_import(
slot,
|chain| {
chain
.data_availability_checker
.put_rpc_blobs(block_root, blobs)
},
CountUnrealized::True,
)
.check_availability_and_maybe_import(slot, |chain| {
chain
.data_availability_checker
.put_rpc_blobs(block_root, blobs)
})
.await;
// Sync handles these results
@@ -262,17 +253,13 @@ impl<T: BeaconChainTypes> Worker<T> {
) {
let result = match sync_type {
// this a request from the range sync
ChainSegmentProcessId::RangeBatchId(chain_id, epoch, count_unrealized) => {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
let sent_blocks = downloaded_blocks.len();
match self
.process_blocks(
downloaded_blocks.iter(),
count_unrealized,
notify_execution_layer,
)
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
.await
{
(_, Ok(_)) => {
@@ -357,11 +344,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// parent blocks are ordered from highest slot to lowest, so we need to process in
// reverse
match self
.process_blocks(
downloaded_blocks.iter().rev(),
CountUnrealized::True,
notify_execution_layer,
)
.process_blocks(downloaded_blocks.iter().rev(), notify_execution_layer)
.await
{
(imported_blocks, Err(e)) => {
@@ -391,13 +374,12 @@ impl<T: BeaconChainTypes> Worker<T> {
async fn process_blocks<'a>(
&self,
downloaded_blocks: impl Iterator<Item = &'a BlockWrapper<T::EthSpec>>,
count_unrealized: CountUnrealized,
notify_execution_layer: NotifyExecutionLayer,
) -> (usize, Result<(), ChainSegmentFailed>) {
let blocks: Vec<_> = downloaded_blocks.cloned().collect();
match self
.chain
.process_chain_segment(blocks, count_unrealized, notify_execution_layer)
.process_chain_segment(blocks, notify_execution_layer)
.await
{
ChainSegmentResult::Successful { imported_blocks } => {