mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-17 03:42:46 +00:00
light client optimistic update reprocessing (#3799)
Currently there is a race between receiving blocks and receiving light client optimistic updates (in unstable), which results in processing errors. This is a continuation of PR #3693 and seeks to progress on issue #3651 Add the parent_root to ReprocessQueueMessage::BlockImported so we can remove blocks from queue when a block arrives that has the same parent root. We use the parent root as opposed to the block_root because the LightClientOptimisticUpdate does not contain the block_root. If light_client_optimistic_update.attested_header.canonical_root() != head_block.message().parent_root() then we queue the update. Otherwise we process immediately. michaelsproul came up with this idea. The code was heavily based off of the attestation reprocessing. I have not properly tested this to see if it works as intended.
This commit is contained in:
committed by
realbigsean
parent
9b5c2eefd5
commit
f857811e5f
@@ -70,7 +70,8 @@ use types::{
|
||||
SyncCommitteeMessage, SyncSubnetId,
|
||||
};
|
||||
use work_reprocessing_queue::{
|
||||
spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
|
||||
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
|
||||
QueuedUnaggregate, ReadyWork,
|
||||
};
|
||||
|
||||
use worker::{Toolbox, Worker};
|
||||
@@ -144,6 +145,10 @@ const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024;
|
||||
/// before we start dropping them.
|
||||
const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
|
||||
|
||||
/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored
|
||||
/// for reprocessing before we start dropping them.
|
||||
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;
|
||||
|
||||
/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
|
||||
/// them.
|
||||
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
|
||||
@@ -233,6 +238,7 @@ pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
|
||||
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
|
||||
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
|
||||
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
|
||||
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
|
||||
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
|
||||
|
||||
/// A simple first-in-first-out queue with a maximum length.
|
||||
@@ -781,6 +787,21 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
|
||||
seen_timestamp,
|
||||
},
|
||||
},
|
||||
ReadyWork::LightClientUpdate(QueuedLightClientUpdate {
|
||||
peer_id,
|
||||
message_id,
|
||||
light_client_optimistic_update,
|
||||
seen_timestamp,
|
||||
..
|
||||
}) => Self {
|
||||
drop_during_sync: true,
|
||||
work: Work::UnknownLightClientOptimisticUpdate {
|
||||
message_id,
|
||||
peer_id,
|
||||
light_client_optimistic_update,
|
||||
seen_timestamp,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -820,6 +841,12 @@ pub enum Work<T: BeaconChainTypes> {
|
||||
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
UnknownLightClientOptimisticUpdate {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
light_client_optimistic_update: Box<LightClientOptimisticUpdate<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
},
|
||||
GossipAggregateBatch {
|
||||
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
|
||||
},
|
||||
@@ -957,6 +984,7 @@ impl<T: BeaconChainTypes> Work<T> {
|
||||
Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST,
|
||||
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
|
||||
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
|
||||
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
|
||||
Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE,
|
||||
}
|
||||
}
|
||||
@@ -1092,6 +1120,8 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
// Using a FIFO queue for light client updates to maintain sequence order.
|
||||
let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN);
|
||||
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
|
||||
let mut unknown_light_client_update_queue =
|
||||
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
|
||||
|
||||
// Using a FIFO queue since blocks need to be imported sequentially.
|
||||
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
|
||||
@@ -1483,6 +1513,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
Work::UnknownBlockAggregate { .. } => {
|
||||
unknown_block_aggregate_queue.push(work)
|
||||
}
|
||||
Work::UnknownLightClientOptimisticUpdate { .. } => {
|
||||
unknown_light_client_update_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
Work::GossipBlsToExecutionChange { .. } => {
|
||||
gossip_bls_to_execution_change_queue.push(work, work_id, &self.log)
|
||||
}
|
||||
@@ -1848,6 +1881,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
message_id,
|
||||
peer_id,
|
||||
*light_client_optimistic_update,
|
||||
Some(work_reprocessing_tx),
|
||||
seen_timestamp,
|
||||
)
|
||||
}),
|
||||
@@ -1998,6 +2032,20 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
|
||||
seen_timestamp,
|
||||
)
|
||||
}),
|
||||
Work::UnknownLightClientOptimisticUpdate {
|
||||
message_id,
|
||||
peer_id,
|
||||
light_client_optimistic_update,
|
||||
seen_timestamp,
|
||||
} => task_spawner.spawn_blocking(move || {
|
||||
worker.process_gossip_optimistic_update(
|
||||
message_id,
|
||||
peer_id,
|
||||
*light_client_optimistic_update,
|
||||
None,
|
||||
seen_timestamp,
|
||||
)
|
||||
}),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user