Merged with unstable

This commit is contained in:
Mark Mackey
2022-11-30 15:14:02 -06:00
125 changed files with 4287 additions and 1502 deletions

View File

@@ -41,11 +41,12 @@
use crate::sync::manager::BlockProcessType;
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::parking_lot::Mutex;
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock};
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer};
use derivative::Derivative;
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::LightClientBootstrapRequest;
use lighthouse_network::SignedBeaconBlockAndBlobsSidecar;
use lighthouse_network::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage},
@@ -169,6 +170,10 @@ const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
/// is activated.
const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;
/// The maximum number of queued `LightClientBootstrapRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;
/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
@@ -210,6 +215,7 @@ pub const STATUS_PROCESSING: &str = "status_processing";
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
@@ -624,6 +630,22 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
/// Create a new work event to process `LightClientBootstrap`s from the RPC network.
pub fn lightclient_bootstrap_request(
peer_id: PeerId,
request_id: PeerRequestId,
request: LightClientBootstrapRequest,
) -> Self {
Self {
drop_during_sync: true,
work: Work::LightClientBootstrapRequest {
peer_id,
request_id,
request,
},
}
}
/// Get a `str` representation of the type of work this `WorkEvent` contains.
pub fn work_type(&self) -> &'static str {
self.work.str_id()
@@ -817,6 +839,11 @@ pub enum Work<T: BeaconChainTypes> {
peer_id: PeerId,
bls_to_execution_change: Box<SignedBlsToExecutionChange>,
},
LightClientBootstrapRequest {
peer_id: PeerId,
request_id: PeerRequestId,
request: LightClientBootstrapRequest,
},
}
impl<T: BeaconChainTypes> Work<T> {
@@ -841,6 +868,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST,
Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST,
Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST,
Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE,
@@ -992,6 +1020,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut gossip_bls_to_execution_change_queue =
FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN);
let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
@@ -1236,6 +1265,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
} else if let Some(item) = backfill_chain_segment.pop() {
self.spawn_worker(item, toolbox);
// This statement should always be the final else statement.
} else if let Some(item) = lcbootstrap_queue.pop() {
self.spawn_worker(item, toolbox);
} else {
// Let the journal know that a worker is freed and there's nothing else
// for it to do.
@@ -1342,6 +1373,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::BlobsByRangeRequest { .. } => {
blbrange_queue.push(work, work_id, &self.log)
}
Work::LightClientBootstrapRequest { .. } => {
lcbootstrap_queue.push(work, work_id, &self.log)
}
Work::UnknownBlockAttestation { .. } => {
unknown_block_attestation_queue.push(work)
}
@@ -1700,8 +1734,24 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
/*
* Verification for a chain segment (multiple blocks).
*/
Work::ChainSegment { process_id, blocks } => task_spawner
.spawn_async(async move { worker.process_chain_segment(process_id, blocks).await }),
Work::ChainSegment { process_id, blocks } => {
let notify_execution_layer = if self
.network_globals
.sync_state
.read()
.is_syncing_finalized()
{
NotifyExecutionLayer::No
} else {
NotifyExecutionLayer::Yes
};
task_spawner.spawn_async(async move {
worker
.process_chain_segment(process_id, blocks, notify_execution_layer)
.await
})
}
/*
* Processing of Status Messages.
*/
@@ -1740,7 +1790,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
request,
)
}),
Work::BlobsByRangeRequest {
peer_id,
request_id,
@@ -1754,7 +1803,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
request,
)
}),
/*
* Processing of lightclient bootstrap requests from other peers.
*/
Work::LightClientBootstrapRequest {
peer_id,
request_id,
request,
} => task_spawner.spawn_blocking(move || {
worker.handle_light_client_bootstrap(peer_id, request_id, request)
}),
Work::UnknownBlockAttestation {
message_id,
peer_id,

View File

@@ -8,7 +8,7 @@ use beacon_chain::{
sync_committee_verification::{self, Error as SyncCommitteeError},
validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, CountUnrealized, ForkChoiceError,
GossipVerifiedBlock,
GossipVerifiedBlock, NotifyExecutionLayer,
};
use lighthouse_network::{
Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource,
@@ -812,7 +812,7 @@ impl<T: BeaconChainTypes> Worker<T> {
| Err(e @ BlockError::BlockIsAlreadyKnown)
| Err(e @ BlockError::RepeatProposal { .. })
| Err(e @ BlockError::NotFinalizedDescendant { .. }) => {
debug!(self.log, "Could not verify block for gossip, ignoring the block";
debug!(self.log, "Could not verify block for gossip. Ignoring the block";
"error" => %e);
// Prevent recurring behaviour by penalizing the peer slightly.
self.gossip_penalize_peer(
@@ -824,7 +824,7 @@ impl<T: BeaconChainTypes> Worker<T> {
return None;
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(self.log, "Could not verify block for gossip, ignoring the block";
debug!(self.log, "Could not verify block for gossip. Ignoring the block";
"error" => %e);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return None;
@@ -846,7 +846,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// TODO(merge): reconsider peer scoring for this event.
| Err(e @ BlockError::ParentExecutionPayloadInvalid { .. })
| Err(e @ BlockError::GenesisBlock) => {
warn!(self.log, "Could not verify block for gossip, rejecting the block";
warn!(self.log, "Could not verify block for gossip. Rejecting the block";
"error" => %e);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(
@@ -953,7 +953,12 @@ impl<T: BeaconChainTypes> Worker<T> {
match self
.chain
.process_block(block_root, verified_block, CountUnrealized::True)
.process_block(
block_root,
verified_block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await
{
Ok(block_root) => {

View File

@@ -38,7 +38,7 @@ impl<T: BeaconChainTypes> Worker<T> {
/// Creates a log if there is an internal error.
fn send_network_message(&self, message: NetworkMessage<T::EthSpec>) {
self.network_tx.send(message).unwrap_or_else(|e| {
debug!(self.log, "Could not send message to the network service, likely shutdown";
debug!(self.log, "Could not send message to the network service. Likely shutdown";
"error" => %e)
});
}

View File

@@ -12,7 +12,7 @@ use slog::{debug, error};
use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::{Epoch, EthSpec, Hash256, Slot};
use types::{light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, Hash256, Slot};
use super::Worker;
@@ -205,6 +205,79 @@ impl<T: BeaconChainTypes> Worker<T> {
)
}
/// Handle a `BlocksByRoot` request from the peer.
pub fn handle_light_client_bootstrap(
self,
peer_id: PeerId,
request_id: PeerRequestId,
request: LightClientBootstrapRequest,
) {
let block_root = request.root;
let state_root = match self.chain.get_blinded_block(&block_root) {
Ok(signed_block) => match signed_block {
Some(signed_block) => signed_block.state_root(),
None => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
request_id,
);
return;
}
},
Err(_) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
request_id,
);
return;
}
};
let mut beacon_state = match self.chain.get_state(&state_root, None) {
Ok(beacon_state) => match beacon_state {
Some(state) => state,
None => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
request_id,
);
return;
}
},
Err(_) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
request_id,
);
return;
}
};
let bootstrap = match LightClientBootstrap::from_beacon_state(&mut beacon_state) {
Ok(bootstrap) => bootstrap,
Err(_) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not avaiable".into(),
request_id,
);
return;
}
};
self.send_response(
peer_id,
Response::LightClientBootstrap(bootstrap),
request_id,
)
}
/// Handle a `BlocksByRange` request from the peer.
pub fn handle_blocks_by_range_request(
self,

View File

@@ -10,6 +10,7 @@ use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::CountUnrealized;
use beacon_chain::{
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
NotifyExecutionLayer,
};
use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
@@ -85,7 +86,12 @@ impl<T: BeaconChainTypes> Worker<T> {
let slot = block.slot();
let result = self
.chain
.process_block(block_root, block, CountUnrealized::True)
.process_block(
block_root,
block,
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
.await;
metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
@@ -127,6 +133,7 @@ impl<T: BeaconChainTypes> Worker<T> {
&self,
sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
notify_execution_layer: NotifyExecutionLayer,
) {
let result = match sync_type {
// this a request from the range sync
@@ -136,7 +143,11 @@ impl<T: BeaconChainTypes> Worker<T> {
let sent_blocks = downloaded_blocks.len();
match self
.process_blocks(downloaded_blocks.iter(), count_unrealized)
.process_blocks(
downloaded_blocks.iter(),
count_unrealized,
notify_execution_layer,
)
.await
{
(_, Ok(_)) => {
@@ -215,7 +226,11 @@ impl<T: BeaconChainTypes> Worker<T> {
// parent blocks are ordered from highest slot to lowest, so we need to process in
// reverse
match self
.process_blocks(downloaded_blocks.iter().rev(), CountUnrealized::True)
.process_blocks(
downloaded_blocks.iter().rev(),
CountUnrealized::True,
notify_execution_layer,
)
.await
{
(imported_blocks, Err(e)) => {
@@ -246,11 +261,12 @@ impl<T: BeaconChainTypes> Worker<T> {
&self,
downloaded_blocks: impl Iterator<Item = &'a Arc<SignedBeaconBlock<T::EthSpec>>>,
count_unrealized: CountUnrealized,
notify_execution_layer: NotifyExecutionLayer,
) -> (usize, Result<(), ChainSegmentFailed>) {
let blocks: Vec<Arc<_>> = downloaded_blocks.cloned().collect();
match self
.chain
.process_chain_segment(blocks, count_unrealized)
.process_chain_segment(blocks, count_unrealized, notify_execution_layer)
.await
{
ChainSegmentResult::Successful { imported_blocks } => {
@@ -428,7 +444,7 @@ impl<T: BeaconChainTypes> Worker<T> {
} else {
// The block is in the future, but not too far.
debug!(
self.log, "Block is slightly ahead of our slot clock, ignoring.";
self.log, "Block is slightly ahead of our slot clock. Ignoring.";
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,