mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 09:16:00 +00:00
Keep PendingComponents in da_checker during import_block (#5845)
* Ensure lookup sync checks caches correctly * Simplify BlockProcessStatus * Keep PendingComponents in da_checker during import_block * Merge branch 'unstable' of https://github.com/sigp/lighthouse into time_in_da_checker * Fix tests with DA checker new eviction policy (#34)
This commit is contained in:
@@ -3336,6 +3336,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
"payload_verification_handle",
|
"payload_verification_handle",
|
||||||
)
|
)
|
||||||
.await??;
|
.await??;
|
||||||
|
|
||||||
|
// Remove block components from da_checker AFTER completing block import. Then we can assert
|
||||||
|
// the following invariant:
|
||||||
|
// > A valid unfinalized block is either in fork-choice or da_checker.
|
||||||
|
//
|
||||||
|
// If we remove the block when it becomes available, there's some time window during
|
||||||
|
// `import_block` where the block is nowhere. Consumers of the da_checker can handle the
|
||||||
|
// extend time a block may exist in the da_checker.
|
||||||
|
//
|
||||||
|
// If `import_block` errors (only errors with internal errors), the pending components will
|
||||||
|
// be pruned on data_availability_checker maintenance as finality advances.
|
||||||
|
self.data_availability_checker
|
||||||
|
.remove_pending_components(block_root);
|
||||||
|
|
||||||
Ok(AvailabilityProcessingStatus::Imported(block_root))
|
Ok(AvailabilityProcessingStatus::Imported(block_root))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -164,6 +164,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
.put_pending_executed_block(executed_block)
|
.put_pending_executed_block(executed_block)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn remove_pending_components(&self, block_root: Hash256) {
|
||||||
|
self.availability_cache
|
||||||
|
.remove_pending_components(block_root)
|
||||||
|
}
|
||||||
|
|
||||||
/// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may
|
/// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may
|
||||||
/// include the fully available block.
|
/// include the fully available block.
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -498,6 +498,21 @@ impl<T: BeaconChainTypes> Critical<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Removes and returns the pending_components corresponding to
|
||||||
|
/// the `block_root` or `None` if it does not exist
|
||||||
|
pub fn remove_pending_components(&mut self, block_root: Hash256) {
|
||||||
|
match self.in_memory.pop_entry(&block_root) {
|
||||||
|
Some { .. } => {}
|
||||||
|
None => {
|
||||||
|
// not in memory, is it in the store?
|
||||||
|
// We don't need to remove the data from the store as we have removed it from
|
||||||
|
// `store_keys` so we won't go looking for it on disk. The maintenance thread
|
||||||
|
// will remove it from disk the next time it runs.
|
||||||
|
self.store_keys.remove(&block_root);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the number of pending component entries in memory.
|
/// Returns the number of pending component entries in memory.
|
||||||
pub fn num_blocks(&self) -> usize {
|
pub fn num_blocks(&self) -> usize {
|
||||||
self.in_memory.len()
|
self.in_memory.len()
|
||||||
@@ -607,6 +622,11 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
|
|||||||
pending_components.merge_blobs(fixed_blobs);
|
pending_components.merge_blobs(fixed_blobs);
|
||||||
|
|
||||||
if pending_components.is_available() {
|
if pending_components.is_available() {
|
||||||
|
write_lock.put_pending_components(
|
||||||
|
block_root,
|
||||||
|
pending_components.clone(),
|
||||||
|
&self.overflow_store,
|
||||||
|
)?;
|
||||||
// No need to hold the write lock anymore
|
// No need to hold the write lock anymore
|
||||||
drop(write_lock);
|
drop(write_lock);
|
||||||
pending_components.make_available(|diet_block| {
|
pending_components.make_available(|diet_block| {
|
||||||
@@ -646,6 +666,11 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
|
|||||||
|
|
||||||
// Check if we have all components and entire set is consistent.
|
// Check if we have all components and entire set is consistent.
|
||||||
if pending_components.is_available() {
|
if pending_components.is_available() {
|
||||||
|
write_lock.put_pending_components(
|
||||||
|
block_root,
|
||||||
|
pending_components.clone(),
|
||||||
|
&self.overflow_store,
|
||||||
|
)?;
|
||||||
// No need to hold the write lock anymore
|
// No need to hold the write lock anymore
|
||||||
drop(write_lock);
|
drop(write_lock);
|
||||||
pending_components.make_available(|diet_block| {
|
pending_components.make_available(|diet_block| {
|
||||||
@@ -661,6 +686,10 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn remove_pending_components(&self, block_root: Hash256) {
|
||||||
|
self.critical.write().remove_pending_components(block_root);
|
||||||
|
}
|
||||||
|
|
||||||
/// write all in memory objects to disk
|
/// write all in memory objects to disk
|
||||||
pub fn write_all_to_disk(&self) -> Result<(), AvailabilityCheckError> {
|
pub fn write_all_to_disk(&self) -> Result<(), AvailabilityCheckError> {
|
||||||
let maintenance_lock = self.maintenance_lock.lock();
|
let maintenance_lock = self.maintenance_lock.lock();
|
||||||
@@ -1195,10 +1224,17 @@ mod test {
|
|||||||
matches!(availability, Availability::Available(_)),
|
matches!(availability, Availability::Available(_)),
|
||||||
"block doesn't have blobs, should be available"
|
"block doesn't have blobs, should be available"
|
||||||
);
|
);
|
||||||
|
assert_eq!(
|
||||||
|
cache.critical.read().in_memory.len(),
|
||||||
|
1,
|
||||||
|
"cache should still have block as it hasn't been imported yet"
|
||||||
|
);
|
||||||
|
// remove the blob to simulate successful import
|
||||||
|
cache.remove_pending_components(root);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
cache.critical.read().in_memory.len(),
|
cache.critical.read().in_memory.len(),
|
||||||
0,
|
0,
|
||||||
"cache should be empty because we don't have blobs"
|
"cache should be empty now that block has been imported"
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
assert!(
|
assert!(
|
||||||
@@ -1263,6 +1299,12 @@ mod test {
|
|||||||
"block should be available: {:?}",
|
"block should be available: {:?}",
|
||||||
availability
|
availability
|
||||||
);
|
);
|
||||||
|
assert!(
|
||||||
|
cache.critical.read().in_memory.len() == 1,
|
||||||
|
"cache should still have available block until import"
|
||||||
|
);
|
||||||
|
// remove the blob to simulate successful import
|
||||||
|
cache.remove_pending_components(root);
|
||||||
assert!(
|
assert!(
|
||||||
cache.critical.read().in_memory.is_empty(),
|
cache.critical.read().in_memory.is_empty(),
|
||||||
"cache should be empty now that all components available"
|
"cache should be empty now that all components available"
|
||||||
@@ -1378,6 +1420,8 @@ mod test {
|
|||||||
.expect("should put blob");
|
.expect("should put blob");
|
||||||
if blob_index == expected_blobs - 1 {
|
if blob_index == expected_blobs - 1 {
|
||||||
assert!(matches!(availability, Availability::Available(_)));
|
assert!(matches!(availability, Availability::Available(_)));
|
||||||
|
// remove the block from the cache to simulate import
|
||||||
|
cache.remove_pending_components(roots[0]);
|
||||||
} else {
|
} else {
|
||||||
// the first block should be brought back into memory
|
// the first block should be brought back into memory
|
||||||
assert!(
|
assert!(
|
||||||
|
|||||||
Reference in New Issue
Block a user