merge upstream

This commit is contained in:
realbigsean
2022-11-26 10:01:57 -05:00
29 changed files with 557 additions and 69 deletions

View File

@@ -63,8 +63,8 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedContributionAndProof,
SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId,
SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId,
};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
@@ -166,6 +166,12 @@ const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1_024;
/// will be stored before we start dropping them.
const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
/// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them.
///
/// This value is set high to accommodate the large spike that is expected immediately after Capella
/// is activated.
const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;
/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
@@ -210,6 +216,7 @@ pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const BLOB_CHAIN_SEGMENT: &str = "blob_chain_segment";
/// A simple first-in-first-out queue with a maximum length.
@@ -520,6 +527,22 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
/// Create a new `Work` event for some BLS to execution change.
pub fn gossip_bls_to_execution_change(
message_id: MessageId,
peer_id: PeerId,
bls_to_execution_change: Box<SignedBlsToExecutionChange>,
) -> Self {
Self {
drop_during_sync: false,
work: Work::GossipBlsToExecutionChange {
message_id,
peer_id,
bls_to_execution_change,
},
}
}
/// Create a new `Work` event for some block, where the result from computation (if any) is
/// sent to the other side of `result_tx`.
pub fn rpc_beacon_block(
@@ -822,6 +845,11 @@ pub enum Work<T: BeaconChainTypes> {
request_id: PeerRequestId,
request: BlobsByRangeRequest,
},
GossipBlsToExecutionChange {
message_id: MessageId,
peer_id: PeerId,
bls_to_execution_change: Box<SignedBlsToExecutionChange>,
},
BlobsByRootsRequest {
peer_id: PeerId,
request_id: PeerRequestId,
@@ -858,6 +886,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::BlobsByRootsRequest { .. } => BLOBS_BY_ROOTS_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::BlobChainSegment { .. } => BLOB_CHAIN_SEGMENT,
}
}
@@ -1005,6 +1034,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN);
let mut gossip_bls_to_execution_change_queue =
FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN);
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN);
@@ -1244,9 +1276,12 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
self.spawn_worker(item, toolbox);
} else if let Some(item) = gossip_proposer_slashing_queue.pop() {
self.spawn_worker(item, toolbox);
// Check exits last since our validators don't get rewards from them.
// Check exits and address changes late since our validators don't get
// rewards from them.
} else if let Some(item) = gossip_voluntary_exit_queue.pop() {
self.spawn_worker(item, toolbox);
} else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
self.spawn_worker(item, toolbox);
// Handle backfill sync chain segments.
} else if let Some(item) = backfill_chain_segment.pop() {
self.spawn_worker(item, toolbox);
@@ -1363,6 +1398,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::UnknownBlockAggregate { .. } => {
unknown_block_aggregate_queue.push(work)
}
Work::GossipBlsToExecutionChange { .. } => {
gossip_bls_to_execution_change_queue.push(work, work_id, &self.log)
}
Work::BlobsByRootsRequest {
peer_id,
request_id,
@@ -1423,6 +1461,10 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL,
gossip_attester_slashing_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL,
gossip_bls_to_execution_change_queue.len() as i64,
);
if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!(
@@ -1685,6 +1727,20 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
seen_timestamp,
)
}),
/*
* BLS to execution change verification.
*/
Work::GossipBlsToExecutionChange {
message_id,
peer_id,
bls_to_execution_change,
} => task_spawner.spawn_blocking(move || {
worker.process_gossip_bls_to_execution_change(
message_id,
peer_id,
*bls_to_execution_change,
)
}),
/*
* Verification for beacon blocks received during syncing via RPC.
*/

View File

@@ -21,8 +21,8 @@ use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, BlobsSidecar, EthSpec, Hash256, IndexedAttestation,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage,
SyncSubnetId,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId,
SyncCommitteeMessage, SyncSubnetId,
};
use super::{
@@ -1181,6 +1181,65 @@ impl<T: BeaconChainTypes> Worker<T> {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL);
}
pub fn process_gossip_bls_to_execution_change(
self,
message_id: MessageId,
peer_id: PeerId,
bls_to_execution_change: SignedBlsToExecutionChange,
) {
let validator_index = bls_to_execution_change.message.validator_index;
let address = bls_to_execution_change.message.to_execution_address;
let change = match self
.chain
.verify_bls_to_execution_change_for_gossip(bls_to_execution_change)
{
Ok(ObservationOutcome::New(change)) => change,
Ok(ObservationOutcome::AlreadyKnown) => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
debug!(
self.log,
"Dropping BLS to execution change";
"validator_index" => validator_index,
"peer" => %peer_id
);
return;
}
Err(e) => {
debug!(
self.log,
"Dropping invalid BLS to execution change";
"validator_index" => validator_index,
"peer" => %peer_id,
"error" => ?e
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
// We penalize the peer slightly to prevent overuse of invalids.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"invalid_bls_to_execution_change",
);
return;
}
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_VERIFIED_TOTAL);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
self.chain.import_bls_to_execution_change(change);
debug!(
self.log,
"Successfully imported BLS to execution change";
"validator_index" => validator_index,
"address" => ?address,
);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_IMPORTED_TOTAL);
}
/// Process the sync committee signature received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.

View File

@@ -143,6 +143,19 @@ lazy_static! {
"beacon_processor_attester_slashing_imported_total",
"Total number of attester slashings imported to the op pool."
);
// Gossip BLS to execution changes.
pub static ref BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_bls_to_execution_change_queue_total",
"Count of address changes from gossip waiting to be verified."
);
pub static ref BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_bls_to_execution_change_verified_total",
"Total number of address changes verified for propagation."
);
pub static ref BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_bls_to_execution_change_imported_total",
"Total number of address changes imported to the op pool."
);
// Rpc blocks.
pub static ref BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_rpc_block_queue_total",

View File

@@ -298,6 +298,18 @@ impl<T: BeaconChainTypes> Router<T> {
sync_committtee_msg.0,
);
}
PubsubMessage::BlsToExecutionChange(bls_to_execution_change) => {
trace!(
self.log,
"Received BLS to execution change";
"peer_id" => %peer_id
);
self.processor.on_bls_to_execution_change_gossip(
id,
peer_id,
bls_to_execution_change,
);
}
}
}
}

View File

@@ -20,7 +20,8 @@ use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, BlobsSidecar, EthSpec, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
SyncSubnetId,
};
/// Processes validated messages from the network. It relays necessary data to the syncing thread
@@ -462,6 +463,19 @@ impl<T: BeaconChainTypes> Processor<T> {
))
}
pub fn on_bls_to_execution_change_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
bls_to_execution_change: Box<SignedBlsToExecutionChange>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_bls_to_execution_change(
message_id,
peer_id,
bls_to_execution_change,
))
}
fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent<T>) {
self.beacon_processor_send
.try_send(work)