mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-01 11:43:44 +00:00
merge
This commit is contained in:
@@ -22,7 +22,7 @@ pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &s
|
||||
"validator_monitor_attestation_simulator_source_attester_miss_total";
|
||||
|
||||
/*
|
||||
* Execution Payload Envelope Procsesing
|
||||
* Execution Payload Envelope Processing
|
||||
*/
|
||||
|
||||
pub static ENVELOPE_PROCESSING_REQUESTS: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
||||
|
||||
@@ -5,7 +5,7 @@ use state_processing::{
|
||||
VerifySignatures,
|
||||
envelope_processing::{VerifyStateRoot, process_execution_payload_envelope},
|
||||
};
|
||||
use types::{EthSpec, SignedExecutionPayloadEnvelope};
|
||||
use types::EthSpec;
|
||||
|
||||
use crate::{
|
||||
BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer,
|
||||
@@ -18,24 +18,14 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
pub trait IntoExecutionPendingEnvelope<T: BeaconChainTypes>: Sized {
|
||||
fn into_execution_pending_envelope(
|
||||
self,
|
||||
chain: &Arc<BeaconChain<T>>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
) -> Result<ExecutionPendingEnvelope<T::EthSpec>, EnvelopeError>;
|
||||
|
||||
fn envelope(&self) -> &Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>;
|
||||
}
|
||||
|
||||
pub struct ExecutionPendingEnvelope<E: EthSpec> {
|
||||
pub signed_envelope: MaybeAvailableEnvelope<E>,
|
||||
pub import_data: EnvelopeImportData<E>,
|
||||
pub payload_verification_handle: PayloadVerificationHandle,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> IntoExecutionPendingEnvelope<T> for GossipVerifiedEnvelope<T> {
|
||||
fn into_execution_pending_envelope(
|
||||
impl<T: BeaconChainTypes> GossipVerifiedEnvelope<T> {
|
||||
pub fn into_execution_pending_envelope(
|
||||
self,
|
||||
chain: &Arc<BeaconChain<T>>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
@@ -116,25 +106,4 @@ impl<T: BeaconChainTypes> IntoExecutionPendingEnvelope<T> for GossipVerifiedEnve
|
||||
payload_verification_handle,
|
||||
})
|
||||
}
|
||||
|
||||
fn envelope(&self) -> &Arc<SignedExecutionPayloadEnvelope<T::EthSpec>> {
|
||||
&self.signed_envelope
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> IntoExecutionPendingEnvelope<T>
|
||||
for Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>
|
||||
{
|
||||
fn into_execution_pending_envelope(
|
||||
self,
|
||||
chain: &Arc<BeaconChain<T>>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
) -> Result<ExecutionPendingEnvelope<T::EthSpec>, EnvelopeError> {
|
||||
GossipVerifiedEnvelope::new(self, &chain.gossip_verification_context())?
|
||||
.into_execution_pending_envelope(chain, notify_execution_layer)
|
||||
}
|
||||
|
||||
fn envelope(&self) -> &Arc<SignedExecutionPayloadEnvelope<T::EthSpec>> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,7 +248,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
///
|
||||
/// ## Errors
|
||||
///
|
||||
/// Returns an `Err` if the given envelope was invalid, or an error was encountered during
|
||||
/// Returns an `Err` if the given envelope was invalid, or an error was encountered during verification.
|
||||
pub async fn verify_envelope_for_gossip(
|
||||
self: &Arc<Self>,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
|
||||
@@ -10,14 +10,14 @@ use types::{BeaconState, BlockImportSource, Hash256, SignedBeaconBlock, Slot};
|
||||
|
||||
use super::{
|
||||
AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData,
|
||||
ExecutedEnvelope, IntoExecutionPendingEnvelope, MaybeAvailableEnvelope,
|
||||
ExecutedEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope,
|
||||
};
|
||||
use crate::{
|
||||
AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
NotifyExecutionLayer,
|
||||
block_verification_types::{AsBlock, AvailableBlockData},
|
||||
metrics,
|
||||
payload_envelope_verification::ExecutionPendingEnvelope,
|
||||
payload_envelope_verification::{ExecutionPendingEnvelope, MaybeAvailableEnvelope},
|
||||
validator_monitor::{get_slot_delay_ms, timestamp_now},
|
||||
};
|
||||
use eth2::types::{EventKind, SseExecutionPayloadAvailable};
|
||||
@@ -26,25 +26,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Returns `Ok(block_root)` if the given `unverified_envelope` was successfully verified and
|
||||
/// imported into the chain.
|
||||
///
|
||||
/// Items that implement `IntoExecutionPendingEnvelope` include:
|
||||
///
|
||||
/// - `GossipVerifiedEnvelope`
|
||||
/// - TODO(gloas) implement for envelopes recieved over RPC
|
||||
///
|
||||
/// ## Errors
|
||||
///
|
||||
/// Returns an `Err` if the given block was invalid, or an error was encountered during
|
||||
/// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during
|
||||
/// verification.
|
||||
#[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))]
|
||||
pub async fn process_execution_payload_envelope<P: IntoExecutionPendingEnvelope<T>>(
|
||||
pub async fn process_execution_payload_envelope(
|
||||
self: &Arc<Self>,
|
||||
block_root: Hash256,
|
||||
unverified_envelope: P,
|
||||
unverified_envelope: GossipVerifiedEnvelope<T>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
block_source: BlockImportSource,
|
||||
publish_fn: impl FnOnce() -> Result<(), EnvelopeError>,
|
||||
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
|
||||
let block_slot = unverified_envelope.envelope().slot();
|
||||
let block_slot = unverified_envelope.signed_envelope.slot();
|
||||
|
||||
// Set observed time if not already set. Usually this should be set by gossip or RPC,
|
||||
// but just in case we set it again here (useful for tests).
|
||||
@@ -110,9 +105,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
};
|
||||
|
||||
// Verify and import the block.
|
||||
// Verify and import the payload envelope.
|
||||
match import_envelope.await {
|
||||
// The block was successfully verified and imported. Yay.
|
||||
// The payload envelope was successfully verified and imported. Yay.
|
||||
Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => {
|
||||
info!(
|
||||
?block_root,
|
||||
@@ -126,7 +121,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
Ok(status)
|
||||
}
|
||||
Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
|
||||
debug!(?block_root, %slot, "Beacon block awaiting blobs");
|
||||
debug!(?block_root, %slot, "Payload envelope awaiting blobs");
|
||||
|
||||
Ok(status)
|
||||
}
|
||||
@@ -139,7 +134,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
// There was an error whilst attempting to verify and import the block. The block might
|
||||
// There was an error whilst attempting to verify and import the payload envelope. It might
|
||||
// be partially verified or partially imported.
|
||||
crit!(
|
||||
error = ?e,
|
||||
@@ -149,7 +144,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
};
|
||||
Err(EnvelopeError::BeaconChainError(e))
|
||||
}
|
||||
// The block failed verification.
|
||||
// The payload envelope failed verification.
|
||||
Err(other) => {
|
||||
warn!(
|
||||
reason = other.to_string(),
|
||||
@@ -165,7 +160,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
///
|
||||
/// An error is returned if the verification handle couldn't be awaited.
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub async fn into_executed_payload_envelope(
|
||||
async fn into_executed_payload_envelope(
|
||||
self: Arc<Self>,
|
||||
pending_envelope: ExecutionPendingEnvelope<T::EthSpec>,
|
||||
) -> Result<ExecutedEnvelope<T::EthSpec>, EnvelopeError> {
|
||||
@@ -278,6 +273,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
return Err(EnvelopeError::BlockRootUnknown { block_root });
|
||||
}
|
||||
|
||||
|
||||
// TODO(gloas) no fork choice logic yet
|
||||
// Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by
|
||||
// avoiding taking other locks whilst holding this lock.
|
||||
let mut fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader);
|
||||
@@ -349,6 +346,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
error = ?e,
|
||||
"Database write failed!"
|
||||
);
|
||||
return Err(e.into());
|
||||
// TODO(gloas) handle db write failure
|
||||
// return Err(self
|
||||
// .handle_import_block_db_write_error(fork_choice)
|
||||
|
||||
@@ -3,27 +3,18 @@
|
||||
//! types, starting at a `SignedExecutionPayloadEnvelope` and finishing with an `AvailableExecutedEnvelope` (see
|
||||
//! diagram below).
|
||||
//!
|
||||
//! // TODO(gloas) we might want to update this diagram to include `AvailabelExecutedEnvelope`
|
||||
//! ```ignore
|
||||
//! START
|
||||
//! |
|
||||
//! ▼
|
||||
//! SignedExecutionPayloadEnvelope
|
||||
//! |
|
||||
//! |---------------
|
||||
//! | |
|
||||
//! | ▼
|
||||
//! | GossipVerifiedEnvelope
|
||||
//! | |
|
||||
//! |---------------
|
||||
//! ▼
|
||||
//! GossipVerifiedEnvelope
|
||||
//! |
|
||||
//! ▼
|
||||
//! ExecutionPendingEnvelope
|
||||
//! |
|
||||
//! await
|
||||
//! |
|
||||
//! ▼
|
||||
//! END
|
||||
//! ExecutedEnvelope
|
||||
//!
|
||||
//! ```
|
||||
|
||||
@@ -48,7 +39,7 @@ pub mod gossip_verified_envelope;
|
||||
pub mod import;
|
||||
mod payload_notifier;
|
||||
|
||||
pub use execution_pending_envelope::{ExecutionPendingEnvelope, IntoExecutionPendingEnvelope};
|
||||
pub use execution_pending_envelope::ExecutionPendingEnvelope;
|
||||
|
||||
#[derive(PartialEq)]
|
||||
pub struct EnvelopeImportData<E: EthSpec> {
|
||||
@@ -63,7 +54,7 @@ pub struct AvailableEnvelope<E: EthSpec> {
|
||||
execution_block_hash: ExecutionBlockHash,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
|
||||
columns: DataColumnSidecarList<E>,
|
||||
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
|
||||
/// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970).
|
||||
columns_available_timestamp: Option<std::time::Duration>,
|
||||
pub spec: Arc<ChainSpec>,
|
||||
}
|
||||
@@ -111,7 +102,7 @@ pub enum MaybeAvailableEnvelope<E: EthSpec> {
|
||||
},
|
||||
}
|
||||
|
||||
/// This snapshot is to be used for verifying a envelope of the block.
|
||||
/// This snapshot is to be used for verifying a payload envelope.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EnvelopeProcessingSnapshot<E: EthSpec> {
|
||||
/// This state is equivalent to the `self.beacon_block.state_root()` before applying the envelope.
|
||||
@@ -183,54 +174,41 @@ impl<E: EthSpec> AvailableExecutedEnvelope<E> {
|
||||
#[derive(Debug)]
|
||||
pub enum EnvelopeError {
|
||||
/// The envelope's block root is unknown.
|
||||
BlockRootUnknown {
|
||||
block_root: Hash256,
|
||||
},
|
||||
BlockRootUnknown { block_root: Hash256 },
|
||||
/// The signature is invalid.
|
||||
BadSignature,
|
||||
/// The builder index doesn't match the committed bid
|
||||
BuilderIndexMismatch {
|
||||
committed_bid: u64,
|
||||
envelope: u64,
|
||||
},
|
||||
// The envelope slot doesn't match the block
|
||||
SlotMismatch {
|
||||
block: Slot,
|
||||
envelope: Slot,
|
||||
},
|
||||
// The validator index is unknown
|
||||
UnknownValidator {
|
||||
builder_index: u64,
|
||||
},
|
||||
// The block hash doesn't match the committed bid
|
||||
BuilderIndexMismatch { committed_bid: u64, envelope: u64 },
|
||||
/// The envelope slot doesn't match the block
|
||||
SlotMismatch { block: Slot, envelope: Slot },
|
||||
/// The validator index is unknown
|
||||
UnknownValidator { builder_index: u64 },
|
||||
/// The block hash doesn't match the committed bid
|
||||
BlockHashMismatch {
|
||||
committed_bid: ExecutionBlockHash,
|
||||
envelope: ExecutionBlockHash,
|
||||
},
|
||||
// The block's proposer_index does not match the locally computed proposer
|
||||
IncorrectBlockProposer {
|
||||
block: u64,
|
||||
local_shuffling: u64,
|
||||
},
|
||||
// The slot belongs to a block that is from a slot prior than
|
||||
// the most recently finalized slot
|
||||
/// The block's proposer_index does not match the locally computed proposer
|
||||
IncorrectBlockProposer { block: u64, local_shuffling: u64 },
|
||||
/// The slot belongs to a block that is from a slot prior than
|
||||
/// to most recently finalized slot
|
||||
PriorToFinalization {
|
||||
payload_slot: Slot,
|
||||
latest_finalized_slot: Slot,
|
||||
},
|
||||
// Some Beacon Chain Error
|
||||
/// Some Beacon Chain Error
|
||||
BeaconChainError(Arc<BeaconChainError>),
|
||||
// Some Beacon State error
|
||||
/// Some Beacon State error
|
||||
BeaconStateError(BeaconStateError),
|
||||
// Some BlockProcessingError (for electra operations)
|
||||
/// Some BlockProcessingError (for electra operations)
|
||||
BlockProcessingError(BlockProcessingError),
|
||||
// Some EnvelopeProcessingError
|
||||
/// Some EnvelopeProcessingError
|
||||
EnvelopeProcessingError(EnvelopeProcessingError),
|
||||
// Error verifying the execution payload
|
||||
/// Error verifying the execution payload
|
||||
ExecutionPayloadError(ExecutionPayloadError),
|
||||
// An error from block-level checks reused during envelope import
|
||||
/// An error from block-level checks reused during envelope import
|
||||
BlockError(BlockError),
|
||||
// Internal error
|
||||
/// Internal error
|
||||
InternalError(String),
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,8 @@
|
||||
pub use crate::scheduler::BeaconProcessorQueueLengths;
|
||||
use crate::scheduler::work_queue::WorkQueues;
|
||||
use crate::work_reprocessing_queue::{
|
||||
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
|
||||
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope,
|
||||
ReprocessQueueMessage,
|
||||
};
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use futures::task::Poll;
|
||||
@@ -242,6 +243,18 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
|
||||
process_fn,
|
||||
},
|
||||
},
|
||||
ReadyWork::Envelope(QueuedGossipEnvelope {
|
||||
beacon_block_slot,
|
||||
beacon_block_root,
|
||||
process_fn,
|
||||
}) => Self {
|
||||
drop_during_sync: false,
|
||||
work: Work::DelayedImportEnvelope {
|
||||
beacon_block_slot,
|
||||
beacon_block_root,
|
||||
process_fn,
|
||||
},
|
||||
},
|
||||
ReadyWork::RpcBlock(QueuedRpcBlock {
|
||||
beacon_block_root,
|
||||
process_fn,
|
||||
@@ -384,6 +397,11 @@ pub enum Work<E: EthSpec> {
|
||||
beacon_block_root: Hash256,
|
||||
process_fn: AsyncFn,
|
||||
},
|
||||
DelayedImportEnvelope {
|
||||
beacon_block_slot: Slot,
|
||||
beacon_block_root: Hash256,
|
||||
process_fn: AsyncFn,
|
||||
},
|
||||
GossipVoluntaryExit(BlockingFn),
|
||||
GossipProposerSlashing(BlockingFn),
|
||||
GossipAttesterSlashing(BlockingFn),
|
||||
@@ -447,6 +465,7 @@ pub enum WorkType {
|
||||
GossipBlobSidecar,
|
||||
GossipDataColumnSidecar,
|
||||
DelayedImportBlock,
|
||||
DelayedImportEnvelope,
|
||||
GossipVoluntaryExit,
|
||||
GossipProposerSlashing,
|
||||
GossipAttesterSlashing,
|
||||
@@ -498,6 +517,7 @@ impl<E: EthSpec> Work<E> {
|
||||
Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar,
|
||||
Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar,
|
||||
Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock,
|
||||
Work::DelayedImportEnvelope { .. } => WorkType::DelayedImportEnvelope,
|
||||
Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit,
|
||||
Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing,
|
||||
Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing,
|
||||
@@ -793,6 +813,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
// on the delayed ones.
|
||||
} else if let Some(item) = work_queues.delayed_block_queue.pop() {
|
||||
Some(item)
|
||||
} else if let Some(item) = work_queues.delayed_envelope_queue.pop() {
|
||||
Some(item)
|
||||
// Check gossip blocks and payloads before gossip attestations, since a block might be
|
||||
// required to verify some attestations.
|
||||
} else if let Some(item) = work_queues.gossip_block_queue.pop() {
|
||||
@@ -1111,6 +1133,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
Work::DelayedImportBlock { .. } => {
|
||||
work_queues.delayed_block_queue.push(work, work_id)
|
||||
}
|
||||
Work::DelayedImportEnvelope { .. } => {
|
||||
work_queues.delayed_envelope_queue.push(work, work_id)
|
||||
}
|
||||
Work::GossipVoluntaryExit { .. } => {
|
||||
work_queues.gossip_voluntary_exit_queue.push(work, work_id)
|
||||
}
|
||||
@@ -1238,6 +1263,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
work_queues.gossip_data_column_queue.len()
|
||||
}
|
||||
WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(),
|
||||
WorkType::DelayedImportEnvelope => work_queues.delayed_envelope_queue.len(),
|
||||
WorkType::GossipVoluntaryExit => {
|
||||
work_queues.gossip_voluntary_exit_queue.len()
|
||||
}
|
||||
@@ -1435,6 +1461,11 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
beacon_block_slot: _,
|
||||
beacon_block_root: _,
|
||||
process_fn,
|
||||
}
|
||||
| Work::DelayedImportEnvelope {
|
||||
beacon_block_slot: _,
|
||||
beacon_block_root: _,
|
||||
process_fn,
|
||||
} => task_spawner.spawn_async(process_fn),
|
||||
Work::RpcBlock {
|
||||
process_fn,
|
||||
|
||||
@@ -127,6 +127,7 @@ pub struct BeaconProcessorQueueLengths {
|
||||
gossip_blob_queue: usize,
|
||||
gossip_data_column_queue: usize,
|
||||
delayed_block_queue: usize,
|
||||
delayed_envelope_queue: usize,
|
||||
status_queue: usize,
|
||||
block_brange_queue: usize,
|
||||
block_broots_queue: usize,
|
||||
@@ -197,6 +198,7 @@ impl BeaconProcessorQueueLengths {
|
||||
gossip_blob_queue: 1024,
|
||||
gossip_data_column_queue: 1024,
|
||||
delayed_block_queue: 1024,
|
||||
delayed_envelope_queue: 1024,
|
||||
status_queue: 1024,
|
||||
block_brange_queue: 1024,
|
||||
block_broots_queue: 1024,
|
||||
@@ -250,6 +252,7 @@ pub struct WorkQueues<E: EthSpec> {
|
||||
pub gossip_blob_queue: FifoQueue<Work<E>>,
|
||||
pub gossip_data_column_queue: FifoQueue<Work<E>>,
|
||||
pub delayed_block_queue: FifoQueue<Work<E>>,
|
||||
pub delayed_envelope_queue: FifoQueue<Work<E>>,
|
||||
pub status_queue: FifoQueue<Work<E>>,
|
||||
pub block_brange_queue: FifoQueue<Work<E>>,
|
||||
pub block_broots_queue: FifoQueue<Work<E>>,
|
||||
@@ -315,6 +318,7 @@ impl<E: EthSpec> WorkQueues<E> {
|
||||
let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
|
||||
let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue);
|
||||
let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);
|
||||
let delayed_envelope_queue = FifoQueue::new(queue_lengths.delayed_envelope_queue);
|
||||
|
||||
let status_queue = FifoQueue::new(queue_lengths.status_queue);
|
||||
let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue);
|
||||
@@ -375,6 +379,7 @@ impl<E: EthSpec> WorkQueues<E> {
|
||||
gossip_blob_queue,
|
||||
gossip_data_column_queue,
|
||||
delayed_block_queue,
|
||||
delayed_envelope_queue,
|
||||
status_queue,
|
||||
block_brange_queue,
|
||||
block_broots_queue,
|
||||
|
||||
@@ -35,6 +35,7 @@ use types::{EthSpec, Hash256, Slot};
|
||||
|
||||
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
|
||||
const GOSSIP_BLOCKS: &str = "gossip_blocks";
|
||||
const GOSSIP_ENVELOPES: &str = "gossip_envelopes";
|
||||
const RPC_BLOCKS: &str = "rpc_blocks";
|
||||
const ATTESTATIONS: &str = "attestations";
|
||||
const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root";
|
||||
@@ -51,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
|
||||
/// For how long to queue light client updates for re-processing.
|
||||
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
|
||||
|
||||
/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be
|
||||
/// sent for processing after this many slots worth of time, even if the block hasn't arrived.
|
||||
const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1;
|
||||
|
||||
/// For how long to queue rpc blocks before sending them back for reprocessing.
|
||||
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);
|
||||
|
||||
@@ -65,6 +70,9 @@ pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);
|
||||
/// it's nice to have extra protection.
|
||||
const MAXIMUM_QUEUED_BLOCKS: usize = 16;
|
||||
|
||||
/// Set an arbitrary upper-bound on the number of queued envelopes to avoid DoS attacks.
|
||||
const MAXIMUM_QUEUED_ENVELOPES: usize = 16;
|
||||
|
||||
/// How many attestations we keep before new ones get dropped.
|
||||
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
|
||||
|
||||
@@ -93,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4);
|
||||
pub enum ReprocessQueueMessage {
|
||||
/// A block that has been received early and we should queue for later processing.
|
||||
EarlyBlock(QueuedGossipBlock),
|
||||
/// An execution payload envelope that references a block not yet in fork choice.
|
||||
UnknownBlockEnvelope(QueuedGossipEnvelope),
|
||||
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
|
||||
/// hash until the gossip block is imported.
|
||||
RpcBlock(QueuedRpcBlock),
|
||||
@@ -120,6 +130,7 @@ pub enum ReprocessQueueMessage {
|
||||
/// Events sent by the scheduler once they are ready for re-processing.
|
||||
pub enum ReadyWork {
|
||||
Block(QueuedGossipBlock),
|
||||
Envelope(QueuedGossipEnvelope),
|
||||
RpcBlock(QueuedRpcBlock),
|
||||
IgnoredRpcBlock(IgnoredRpcBlock),
|
||||
Unaggregate(QueuedUnaggregate),
|
||||
@@ -157,6 +168,13 @@ pub struct QueuedGossipBlock {
|
||||
pub process_fn: AsyncFn,
|
||||
}
|
||||
|
||||
/// An execution payload envelope that arrived early and has been queued for later import.
|
||||
pub struct QueuedGossipEnvelope {
|
||||
pub beacon_block_slot: Slot,
|
||||
pub beacon_block_root: Hash256,
|
||||
pub process_fn: AsyncFn,
|
||||
}
|
||||
|
||||
/// A block that arrived for processing when the same block was being imported over gossip.
|
||||
/// It is queued for later import.
|
||||
pub struct QueuedRpcBlock {
|
||||
@@ -209,6 +227,8 @@ impl<E: EthSpec> From<QueuedBackfillBatch> for WorkEvent<E> {
|
||||
enum InboundEvent {
|
||||
/// A gossip block that was queued for later processing and is ready for import.
|
||||
ReadyGossipBlock(QueuedGossipBlock),
|
||||
/// An envelope whose block has been imported and is now ready for processing.
|
||||
ReadyEnvelope(Hash256),
|
||||
/// A rpc block that was queued because the same gossip block was being imported
|
||||
/// will now be retried for import.
|
||||
ReadyRpcBlock(QueuedRpcBlock),
|
||||
@@ -234,6 +254,8 @@ struct ReprocessQueue<S> {
|
||||
/* Queues */
|
||||
/// Queue to manage scheduled early blocks.
|
||||
gossip_block_delay_queue: DelayQueue<QueuedGossipBlock>,
|
||||
/// Queue to manage envelope timeouts (keyed by block root).
|
||||
envelope_delay_queue: DelayQueue<Hash256>,
|
||||
/// Queue to manage scheduled early blocks.
|
||||
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock>,
|
||||
/// Queue to manage scheduled attestations.
|
||||
@@ -246,6 +268,8 @@ struct ReprocessQueue<S> {
|
||||
/* Queued items */
|
||||
/// Queued blocks.
|
||||
queued_gossip_block_roots: HashSet<Hash256>,
|
||||
/// Queued envelopes awaiting their block, keyed by block root.
|
||||
awaiting_envelopes_per_root: HashMap<Hash256, (QueuedGossipEnvelope, DelayKey)>,
|
||||
/// Queued aggregated attestations.
|
||||
queued_aggregates: FnvHashMap<usize, (QueuedAggregate, DelayKey)>,
|
||||
/// Queued attestations.
|
||||
@@ -266,6 +290,7 @@ struct ReprocessQueue<S> {
|
||||
next_attestation: usize,
|
||||
next_lc_update: usize,
|
||||
early_block_debounce: TimeLatch,
|
||||
envelope_delay_debounce: TimeLatch,
|
||||
rpc_block_debounce: TimeLatch,
|
||||
attestation_delay_debounce: TimeLatch,
|
||||
lc_update_delay_debounce: TimeLatch,
|
||||
@@ -315,6 +340,13 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
match self.envelope_delay_queue.poll_expired(cx) {
|
||||
Poll::Ready(Some(block_root)) => {
|
||||
return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner())));
|
||||
}
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
match self.rpc_block_delay_queue.poll_expired(cx) {
|
||||
Poll::Ready(Some(queued_block)) => {
|
||||
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
|
||||
@@ -418,11 +450,13 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
work_reprocessing_rx,
|
||||
ready_work_tx,
|
||||
gossip_block_delay_queue: DelayQueue::new(),
|
||||
envelope_delay_queue: DelayQueue::new(),
|
||||
rpc_block_delay_queue: DelayQueue::new(),
|
||||
attestations_delay_queue: DelayQueue::new(),
|
||||
lc_updates_delay_queue: DelayQueue::new(),
|
||||
column_reconstructions_delay_queue: DelayQueue::new(),
|
||||
queued_gossip_block_roots: HashSet::new(),
|
||||
awaiting_envelopes_per_root: HashMap::new(),
|
||||
queued_lc_updates: FnvHashMap::default(),
|
||||
queued_aggregates: FnvHashMap::default(),
|
||||
queued_unaggregates: FnvHashMap::default(),
|
||||
@@ -433,6 +467,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
next_attestation: 0,
|
||||
next_lc_update: 0,
|
||||
early_block_debounce: TimeLatch::default(),
|
||||
envelope_delay_debounce: TimeLatch::default(),
|
||||
rpc_block_debounce: TimeLatch::default(),
|
||||
attestation_delay_debounce: TimeLatch::default(),
|
||||
lc_update_delay_debounce: TimeLatch::default(),
|
||||
@@ -498,6 +533,39 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
}
|
||||
}
|
||||
}
|
||||
// An envelope that references an unknown block. Queue it until the block is
|
||||
// imported, or until the timeout expires.
|
||||
InboundEvent::Msg(UnknownBlockEnvelope(queued_envelope)) => {
|
||||
let block_root = queued_envelope.beacon_block_root;
|
||||
|
||||
// Don't add the same envelope to the queue twice. This prevents DoS attacks.
|
||||
if self.awaiting_envelopes_per_root.contains_key(&block_root) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Check to ensure this won't over-fill the queue.
|
||||
if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES {
|
||||
if self.envelope_delay_debounce.elapsed() {
|
||||
warn!(
|
||||
queue_size = MAXIMUM_QUEUED_ENVELOPES,
|
||||
msg = "system resources may be saturated",
|
||||
"Envelope delay queue is full"
|
||||
);
|
||||
}
|
||||
// Drop the envelope.
|
||||
return;
|
||||
}
|
||||
|
||||
// Register the timeout.
|
||||
let delay_key = self.envelope_delay_queue.insert(
|
||||
block_root,
|
||||
self.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS,
|
||||
);
|
||||
|
||||
// Store the envelope keyed by block root.
|
||||
self.awaiting_envelopes_per_root
|
||||
.insert(block_root, (queued_envelope, delay_key));
|
||||
}
|
||||
// A rpc block arrived for processing at the same time when a gossip block
|
||||
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
|
||||
// and then send the rpc block back for processing assuming the gossip import
|
||||
@@ -647,6 +715,23 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
block_root,
|
||||
parent_root,
|
||||
}) => {
|
||||
// Unqueue the envelope we have for this root, if any.
|
||||
if let Some((envelope, delay_key)) =
|
||||
self.awaiting_envelopes_per_root.remove(&block_root)
|
||||
{
|
||||
self.envelope_delay_queue.remove(&delay_key);
|
||||
if self
|
||||
.ready_work_tx
|
||||
.try_send(ReadyWork::Envelope(envelope))
|
||||
.is_err()
|
||||
{
|
||||
error!(
|
||||
?block_root,
|
||||
"Failed to send envelope for reprocessing after block import"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Unqueue the attestations we have for this root, if any.
|
||||
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
|
||||
let mut sent_count = 0;
|
||||
@@ -802,6 +887,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
error!("Failed to pop queued block");
|
||||
}
|
||||
}
|
||||
// An envelope's timeout has expired. Send it for processing regardless of
|
||||
// whether the block has been imported.
|
||||
InboundEvent::ReadyEnvelope(block_root) => {
|
||||
if let Some((envelope, _delay_key)) =
|
||||
self.awaiting_envelopes_per_root.remove(&block_root)
|
||||
{
|
||||
if self
|
||||
.ready_work_tx
|
||||
.try_send(ReadyWork::Envelope(envelope))
|
||||
.is_err()
|
||||
{
|
||||
error!(?block_root, "Failed to send envelope after timeout");
|
||||
}
|
||||
}
|
||||
}
|
||||
InboundEvent::ReadyAttestation(queued_id) => {
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
|
||||
@@ -941,6 +1041,11 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
&[GOSSIP_BLOCKS],
|
||||
self.gossip_block_delay_queue.len() as i64,
|
||||
);
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
|
||||
&[GOSSIP_ENVELOPES],
|
||||
self.awaiting_envelopes_per_root.len() as i64,
|
||||
);
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
|
||||
&[RPC_BLOCKS],
|
||||
|
||||
@@ -49,8 +49,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
|
||||
use beacon_processor::{
|
||||
DuplicateCache, GossipAggregatePackage, GossipAttestationBatch,
|
||||
work_reprocessing_queue::{
|
||||
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
|
||||
ReprocessQueueMessage,
|
||||
QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate,
|
||||
QueuedUnaggregate, ReprocessQueueMessage,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -3347,7 +3347,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
// TODO(gloas) update metrics to note how early the envelope arrived
|
||||
|
||||
let inner_self = self.clone();
|
||||
let _process_fn = Box::pin(async move {
|
||||
let process_fn = Box::pin(async move {
|
||||
inner_self
|
||||
.process_gossip_verified_execution_payload_envelope(
|
||||
peer_id,
|
||||
@@ -3356,7 +3356,27 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
.await;
|
||||
});
|
||||
|
||||
// TODO(gloas) send to reprocess queue
|
||||
if self
|
||||
.beacon_processor_send
|
||||
.try_send(WorkEvent {
|
||||
drop_during_sync: false,
|
||||
work: Work::Reprocess(ReprocessQueueMessage::UnknownBlockEnvelope(
|
||||
QueuedGossipEnvelope {
|
||||
beacon_block_slot: envelope_slot,
|
||||
beacon_block_root,
|
||||
process_fn,
|
||||
},
|
||||
)),
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
error!(
|
||||
%envelope_slot,
|
||||
?beacon_block_root,
|
||||
location = "envelope gossip",
|
||||
"Failed to defer envelope import"
|
||||
);
|
||||
}
|
||||
None
|
||||
}
|
||||
Ok(_) => Some(verified_envelope),
|
||||
|
||||
@@ -10,7 +10,6 @@ use std::fmt::Debug;
|
||||
use std::future::Future;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -622,8 +621,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
|
||||
)
|
||||
.await?;
|
||||
|
||||
sleep(Duration::from_secs(4));
|
||||
|
||||
// TODO(gloas) we only need to fetch, sign and publish the envelope in the local building case.
|
||||
// Right now we always default to local building. Once we implement trustless/trusted builder logic
|
||||
// we should check the bid for index == BUILDER_INDEX_SELF_BUILD
|
||||
|
||||
Reference in New Issue
Block a user