mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-10 01:26:44 +00:00
Fix sending partials from immediately complete columns (#9433)
The initial partial send after getBlobs only sends incomplete partial columns, causing issues as we do not propagate these columns. Return complete and incomplete columns from `get_columns_and_mark_as_local_fetched`, and convert any complete columns to partials if necessary. Send all columns retrieved this way after getBlobs Co-Authored-By: Daniel Knopik <daniel@dknopik.de>
This commit is contained in:
@@ -204,14 +204,16 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
|
||||
.cloned()
|
||||
}
|
||||
|
||||
/// Get all current partials for a block for publishing after fetching local blobs.
|
||||
/// To unlock future publishing, mark blobs as fetched locally.
|
||||
/// We do this within one write lock to avoid useless double publishes.
|
||||
pub fn get_partials_and_mark_as_local_fetched(
|
||||
/// Get all current columns for a block (complete *and* incomplete) for publishing after
|
||||
/// fetching local blobs.
|
||||
///
|
||||
/// To unlock future publishing, mark blobs as fetched locally. We do this within one write
|
||||
/// lock to avoid useless double publishes.
|
||||
pub fn get_columns_and_mark_as_local_fetched(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
header: &Arc<PartialDataColumnHeader<E>>,
|
||||
) -> Vec<KzgVerifiedCustodyPartialDataColumn<E>> {
|
||||
) -> Vec<AssemblyColumn<E>> {
|
||||
let mut assemblies = self.assemblies.write();
|
||||
let assembly = assemblies.get_or_insert_mut(block_root, || PartialAssembly {
|
||||
header: header.clone(),
|
||||
@@ -221,17 +223,7 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
|
||||
|
||||
assembly.has_local_blobs = true;
|
||||
|
||||
assembly
|
||||
.columns
|
||||
.values()
|
||||
.filter_map(|value| {
|
||||
if let AssemblyColumn::Incomplete(partial) = value {
|
||||
Some(partial.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
assembly.columns.values().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get header for a block if we have an active assembly
|
||||
@@ -473,6 +465,37 @@ mod tests {
|
||||
assert_eq!(result.updated_partials[0].index(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_columns_returns_complete_and_incomplete() {
|
||||
let assembler = make_assembler();
|
||||
let root = Hash256::repeat_byte(1);
|
||||
let header = Arc::new(make_header(4));
|
||||
|
||||
// One complete column (all cells present) and one still-incomplete column.
|
||||
let complete = make_partial(root, 0, 4, &[0, 1, 2, 3]);
|
||||
let incomplete = make_partial(root, 1, 4, &[0, 1]);
|
||||
assembler.merge_partials(root, vec![complete, incomplete], header.clone());
|
||||
|
||||
// Both must be returned for seeding. Previously the complete column was dropped, so it was
|
||||
// published as an empty placeholder and never served to peers.
|
||||
let columns = assembler.get_columns_and_mark_as_local_fetched(root, &header);
|
||||
assert_eq!(columns.len(), 2);
|
||||
assert_eq!(
|
||||
columns
|
||||
.iter()
|
||||
.filter(|c| matches!(c, AssemblyColumn::Complete(_)))
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
columns
|
||||
.iter()
|
||||
.filter(|c| matches!(c, AssemblyColumn::Incomplete(_)))
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
}
|
||||
|
||||
// -- mark_as_complete tests --
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -6,6 +6,7 @@ use beacon_chain::data_column_verification::{
|
||||
GossipDataColumnError, KzgVerifiedCustodyDataColumn, observe_gossip_data_column,
|
||||
};
|
||||
use beacon_chain::fetch_blobs::{FetchEngineBlobError, fetch_and_process_engine_blobs};
|
||||
use beacon_chain::partial_data_column_assembler::AssemblyColumn;
|
||||
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
|
||||
use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
|
||||
use beacon_processor::{
|
||||
@@ -972,14 +973,35 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
// Publish partial columns without eager send
|
||||
// TODO(gloas): implement publish partial columns without eager send
|
||||
if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() {
|
||||
let columns = assembler.get_partials_and_mark_as_local_fetched(block_root, &header);
|
||||
let columns = assembler.get_columns_and_mark_as_local_fetched(block_root, &header);
|
||||
// Republish both complete and incomplete columns as partials
|
||||
let columns: Vec<_> = columns
|
||||
.into_iter()
|
||||
.filter_map(|column| match column {
|
||||
AssemblyColumn::Incomplete(partial) => Some(partial.into_inner()),
|
||||
AssemblyColumn::Complete(full) => {
|
||||
let DataColumnSidecar::Fulu(fulu) = full.as_data_column() else {
|
||||
return None;
|
||||
};
|
||||
match fulu.to_partial() {
|
||||
Ok(partial) => Some(Arc::new(partial)),
|
||||
Err(err) => {
|
||||
error!(
|
||||
%block_root,
|
||||
column_index = %full.index(),
|
||||
?err,
|
||||
"Failed to convert complete column to partial for re-seeding"
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if !columns.is_empty() {
|
||||
debug!(block = %block_root, "Publishing all partials after getBlobs");
|
||||
self.send_network_message(NetworkMessage::PublishPartialColumns {
|
||||
columns: columns
|
||||
.into_iter()
|
||||
.map(|partial| partial.into_inner())
|
||||
.collect(),
|
||||
columns,
|
||||
header,
|
||||
});
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user