Convert RpcBlock to an enum that indicates availability (#8424)

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>

Co-Authored-By: Mark Mackey <mark@sigmaprime.io>

Co-Authored-By: Eitan Seri-Levi <eserilev@gmail.com>

Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
Eitan Seri-Levi
2026-01-27 21:59:32 -08:00
committed by GitHub
parent c4409cdf28
commit f7b5c7ee3f
23 changed files with 1368 additions and 579 deletions

View File

@@ -1,5 +1,9 @@
use beacon_chain::{
block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root,
BeaconChainTypes,
block_verification_types::{AvailableBlockData, RpcBlock},
data_availability_checker::DataAvailabilityChecker,
data_column_verification::CustodyDataColumn,
get_block_root,
};
use lighthouse_network::{
PeerId,
@@ -192,19 +196,26 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
/// Returns `None` if not all expected requests have completed.
/// Returns `Some(Ok(_))` with valid RPC blocks if all data is present and valid.
/// Returns `Some(Err(_))` if there are issues coupling blocks with their data.
pub fn responses(
pub fn responses<T>(
&mut self,
spec: &ChainSpec,
) -> Option<Result<Vec<RpcBlock<E>>, CouplingError>> {
da_checker: Arc<DataAvailabilityChecker<T>>,
spec: Arc<ChainSpec>,
) -> Option<Result<Vec<RpcBlock<E>>, CouplingError>>
where
T: BeaconChainTypes<EthSpec = E>,
{
let Some(blocks) = self.blocks_request.to_finished() else {
return None;
};
// Increment the attempt once this function returns the response or errors
match &mut self.block_data_request {
RangeBlockDataRequest::NoData => {
Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec))
}
RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs(
blocks.to_vec(),
vec![],
da_checker,
spec,
)),
RangeBlockDataRequest::Blobs(request) => {
let Some(blobs) = request.to_finished() else {
return None;
@@ -212,6 +223,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Some(Self::responses_with_blobs(
blocks.to_vec(),
blobs.to_vec(),
da_checker,
spec,
))
}
@@ -248,6 +260,8 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
column_to_peer_id,
expected_custody_columns,
*attempt,
da_checker,
spec,
);
if let Err(CouplingError::DataColumnPeerFailure {
@@ -269,11 +283,15 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
}
fn responses_with_blobs(
fn responses_with_blobs<T>(
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
blobs: Vec<Arc<BlobSidecar<E>>>,
spec: &ChainSpec,
) -> Result<Vec<RpcBlock<E>>, CouplingError> {
da_checker: Arc<DataAvailabilityChecker<T>>,
spec: Arc<ChainSpec>,
) -> Result<Vec<RpcBlock<E>>, CouplingError>
where
T: BeaconChainTypes<EthSpec = E>,
{
// There can't be more more blobs than blocks. i.e. sending any blob (empty
// included) for a skipped slot is not permitted.
let mut responses = Vec::with_capacity(blocks.len());
@@ -315,8 +333,9 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
.map_err(|_| {
CouplingError::BlobPeerFailure("Blobs returned exceeds max length".to_string())
})?;
let block_data = AvailableBlockData::new_with_blobs(blobs);
responses.push(
RpcBlock::new(None, block, Some(blobs))
RpcBlock::new(block, Some(block_data), &da_checker, spec.clone())
.map_err(|e| CouplingError::BlobPeerFailure(format!("{e:?}")))?,
)
}
@@ -333,13 +352,18 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Ok(responses)
}
fn responses_with_custody_columns(
fn responses_with_custody_columns<T>(
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
data_columns: DataColumnSidecarList<E>,
column_to_peer: HashMap<u64, PeerId>,
expects_custody_columns: &[ColumnIndex],
attempt: usize,
) -> Result<Vec<RpcBlock<E>>, CouplingError> {
da_checker: Arc<DataAvailabilityChecker<T>>,
spec: Arc<ChainSpec>,
) -> Result<Vec<RpcBlock<E>>, CouplingError>
where
T: BeaconChainTypes<EthSpec = E>,
{
// Group data columns by block_root and index
let mut data_columns_by_block =
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
@@ -415,11 +439,14 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
);
}
RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns)
let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::<Vec<_>>());
RpcBlock::new(block, Some(block_data), &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
} else {
// Block has no data, expects zero columns
RpcBlock::new_without_blobs(Some(block_root), block)
RpcBlock::new(block, Some(AvailableBlockData::NoData), &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
});
}
@@ -459,10 +486,13 @@ impl<I: PartialEq + std::fmt::Display, T> ByRangeRequest<I, T> {
#[cfg(test)]
mod tests {
use super::RangeBlockComponentsRequest;
use crate::sync::network_context::MAX_COLUMN_RETRIES;
use super::RangeBlockComponentsRequest;
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::test_utils::{
NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec,
NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns,
test_da_checker, test_spec,
};
use lighthouse_network::{
PeerId,
@@ -472,7 +502,7 @@ mod tests {
},
};
use rand::SeedableRng;
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use tracing::Span;
use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng};
@@ -512,8 +542,9 @@ mod tests {
}
fn is_finished(info: &mut RangeBlockComponentsRequest<E>) -> bool {
let spec = test_spec::<E>();
info.responses(&spec).is_some()
let spec = Arc::new(test_spec::<E>());
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
info.responses(da_checker, spec).is_some()
}
#[test]
@@ -534,8 +565,11 @@ mod tests {
// Send blocks and complete terminate response
info.add_blocks(blocks_req_id, blocks).unwrap();
let spec = Arc::new(test_spec::<E>());
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
// Assert response is finished and RpcBlocks can be constructed
info.responses(&test_spec::<E>()).unwrap().unwrap();
info.responses(da_checker, spec).unwrap().unwrap();
}
#[test]
@@ -565,16 +599,26 @@ mod tests {
// Expect no blobs returned
info.add_blobs(blobs_req_id, vec![]).unwrap();
// Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned.
// This makes sure we don't expect blobs here when they have expired. Checking this logic should
// be hendled elsewhere.
info.responses(&test_spec::<E>()).unwrap().unwrap();
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
let spec = Arc::new(spec);
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
// Assert response is finished and RpcBlocks cannot be constructed, because blobs weren't returned.
let result = info.responses(da_checker, spec).unwrap();
assert!(result.is_err())
}
#[test]
fn rpc_block_with_custody_columns() {
let spec = test_spec::<E>();
let expects_custody_columns = vec![1, 2, 3, 4];
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
let spec = Arc::new(spec);
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
let expects_custody_columns = da_checker
.custody_context()
.sampling_columns_for_epoch(Epoch::new(0), &spec)
.to_vec();
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| {
@@ -638,18 +682,26 @@ mod tests {
}
// All completed construct response
info.responses(&spec).unwrap().unwrap();
info.responses(da_checker, spec).unwrap().unwrap();
}
#[test]
fn rpc_block_with_custody_columns_batched() {
let spec = test_spec::<E>();
let batched_column_requests = [vec![1_u64, 2], vec![3, 4]];
let expects_custody_columns = batched_column_requests
.iter()
.flatten()
.cloned()
.collect::<Vec<_>>();
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
let spec = Arc::new(spec);
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
let expected_sampling_columns = da_checker
.custody_context()
.sampling_columns_for_epoch(Epoch::new(0), &spec)
.to_vec();
// Split sampling columns into two batches
let mid = expected_sampling_columns.len() / 2;
let batched_column_requests = [
expected_sampling_columns[..mid].to_vec(),
expected_sampling_columns[mid..].to_vec(),
];
let custody_column_request_ids =
(0..batched_column_requests.len() as u32).collect::<Vec<_>>();
let num_of_data_column_requests = custody_column_request_ids.len();
@@ -673,7 +725,7 @@ mod tests {
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
Some((columns_req_id.clone(), expects_custody_columns.clone())),
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
Span::none(),
);
@@ -723,14 +775,18 @@ mod tests {
}
// All completed construct response
info.responses(&spec).unwrap().unwrap();
info.responses(da_checker, spec).unwrap().unwrap();
}
#[test]
fn missing_custody_columns_from_faulty_peers() {
// GIVEN: A request expecting custody columns from multiple peers
let spec = test_spec::<E>();
let expected_custody_columns = vec![1, 2, 3, 4];
// GIVEN: A request expecting sampling columns from multiple peers
let spec = Arc::new(test_spec::<E>());
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
let expected_sampling_columns = da_checker
.custody_context()
.sampling_columns_for_epoch(Epoch::new(0), &spec)
.to_vec();
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..2)
.map(|_| {
@@ -745,7 +801,7 @@ mod tests {
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let columns_req_id = expected_custody_columns
let columns_req_id = expected_sampling_columns
.iter()
.enumerate()
.map(|(i, column)| {
@@ -761,7 +817,7 @@ mod tests {
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
Span::none(),
);
@@ -772,8 +828,8 @@ mod tests {
)
.unwrap();
// AND: Only some custody columns are received (columns 1 and 2)
for (i, &column_index) in expected_custody_columns.iter().take(2).enumerate() {
// AND: Only the first 2 sampling columns are received successfully
for (i, &column_index) in expected_sampling_columns.iter().take(2).enumerate() {
let (req, _columns) = columns_req_id.get(i).unwrap();
info.add_custody_columns(
*req,
@@ -786,13 +842,13 @@ mod tests {
}
// AND: Remaining column requests are completed with empty data (simulating faulty peers)
for i in 2..4 {
for i in 2..expected_sampling_columns.len() {
let (req, _columns) = columns_req_id.get(i).unwrap();
info.add_custody_columns(*req, vec![]).unwrap();
}
// WHEN: Attempting to construct RPC blocks
let result = info.responses(&spec).unwrap();
let result = info.responses(da_checker, spec).unwrap();
// THEN: Should fail with PeerFailure identifying the faulty peers
assert!(result.is_err());
@@ -803,9 +859,13 @@ mod tests {
}) = result
{
assert!(error.contains("Peers did not return column"));
assert_eq!(faulty_peers.len(), 2); // columns 3 and 4 missing
assert_eq!(faulty_peers[0].0, 3); // column index 3
assert_eq!(faulty_peers[1].0, 4); // column index 4
// All columns after the first 2 should be reported as faulty
let expected_faulty_count = expected_sampling_columns.len() - 2;
assert_eq!(faulty_peers.len(), expected_faulty_count);
// Verify the faulty column indices match
for (i, (column_index, _peer)) in faulty_peers.iter().enumerate() {
assert_eq!(*column_index, expected_sampling_columns[i + 2]);
}
assert!(!exceeded_retries); // First attempt, should be false
} else {
panic!("Expected PeerFailure error");
@@ -814,9 +874,16 @@ mod tests {
#[test]
fn retry_logic_after_peer_failures() {
// GIVEN: A request expecting custody columns where some peers initially fail
let spec = test_spec::<E>();
let expected_custody_columns = vec![1, 2];
// GIVEN: A request expecting sampling columns where some peers initially fail
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
let spec = Arc::new(spec);
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
let expected_sampling_columns = da_checker
.custody_context()
.sampling_columns_for_epoch(Epoch::new(0), &spec)
.to_vec();
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..2)
.map(|_| {
@@ -831,7 +898,7 @@ mod tests {
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let columns_req_id = expected_custody_columns
let columns_req_id = expected_sampling_columns
.iter()
.enumerate()
.map(|(i, column)| {
@@ -847,7 +914,7 @@ mod tests {
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
Span::none(),
);
@@ -858,46 +925,61 @@ mod tests {
)
.unwrap();
// AND: Only partial custody columns are received (column 1 but not 2)
let (req1, _) = columns_req_id.first().unwrap();
// AND: Only partial sampling columns are received (first column but not others)
let (req0, _) = columns_req_id.first().unwrap();
info.add_custody_columns(
*req1,
*req0,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| *d.index() == 1).cloned())
.flat_map(|b| {
b.1.iter()
.filter(|d| *d.index() == expected_sampling_columns[0])
.cloned()
})
.collect(),
)
.unwrap();
// AND: The missing column request is completed with empty data (peer failure)
let (req2, _) = columns_req_id.get(1).unwrap();
info.add_custody_columns(*req2, vec![]).unwrap();
// AND: The remaining column requests are completed with empty data (peer failure)
for i in 1..expected_sampling_columns.len() {
let (req, _) = columns_req_id.get(i).unwrap();
info.add_custody_columns(*req, vec![]).unwrap();
}
// WHEN: First attempt to get responses fails
let result = info.responses(&spec).unwrap();
let result: Result<
Vec<beacon_chain::block_verification_types::RpcBlock<E>>,
crate::sync::block_sidecar_coupling::CouplingError,
> = info.responses(da_checker.clone(), spec.clone()).unwrap();
assert!(result.is_err());
// AND: We retry with a new peer for the failed column
// AND: We retry with a new peer for the failed columns
let new_columns_req_id = columns_id(
10 as Id,
DataColumnsByRangeRequester::ComponentsByRange(components_id),
);
let failed_column_requests = vec![(new_columns_req_id, vec![2])];
info.reinsert_failed_column_requests(failed_column_requests)
.unwrap();
for column in &expected_sampling_columns[1..] {
let failed_column_requests = vec![(new_columns_req_id, vec![*column])];
info.reinsert_failed_column_requests(failed_column_requests)
.unwrap();
}
// AND: The new peer provides the missing column data
let failed_column_indices: Vec<_> = expected_sampling_columns[1..].to_vec();
info.add_custody_columns(
new_columns_req_id,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| *d.index() == 2).cloned())
.flat_map(|b| {
b.1.iter()
.filter(|d| failed_column_indices.contains(d.index()))
.cloned()
})
.collect(),
)
.unwrap();
// WHEN: Attempting to get responses again
let result = info.responses(&spec).unwrap();
let result = info.responses(da_checker, spec).unwrap();
// THEN: Should succeed with complete RPC blocks
assert!(result.is_ok());
@@ -908,8 +990,15 @@ mod tests {
#[test]
fn max_retries_exceeded_behavior() {
// GIVEN: A request where peers consistently fail to provide required columns
let spec = test_spec::<E>();
let expected_custody_columns = vec![1, 2];
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
let spec = Arc::new(spec);
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
let expected_sampling_columns = da_checker
.custody_context()
.sampling_columns_for_epoch(Epoch::new(0), &spec)
.to_vec();
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..1)
.map(|_| {
@@ -924,7 +1013,7 @@ mod tests {
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let columns_req_id = expected_custody_columns
let columns_req_id = expected_sampling_columns
.iter()
.enumerate()
.map(|(i, column)| {
@@ -940,7 +1029,7 @@ mod tests {
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
Span::none(),
);
@@ -951,24 +1040,30 @@ mod tests {
)
.unwrap();
// AND: Only partial custody columns are provided (column 1 but not 2)
let (req1, _) = columns_req_id.first().unwrap();
// AND: Only the first sampling column is provided successfully
let (req0, _) = columns_req_id.first().unwrap();
info.add_custody_columns(
*req1,
*req0,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| *d.index() == 1).cloned())
.flat_map(|b| {
b.1.iter()
.filter(|d| *d.index() == expected_sampling_columns[0])
.cloned()
})
.collect(),
)
.unwrap();
// AND: Column 2 request completes with empty data (persistent peer failure)
let (req2, _) = columns_req_id.get(1).unwrap();
info.add_custody_columns(*req2, vec![]).unwrap();
// AND: All other column requests complete with empty data (persistent peer failure)
for i in 1..expected_sampling_columns.len() {
let (req, _) = columns_req_id.get(i).unwrap();
info.add_custody_columns(*req, vec![]).unwrap();
}
// WHEN: Multiple retry attempts are made (up to max retries)
for _ in 0..MAX_COLUMN_RETRIES {
let result = info.responses(&spec).unwrap();
let result = info.responses(da_checker.clone(), spec.clone()).unwrap();
assert!(result.is_err());
if let Err(super::CouplingError::DataColumnPeerFailure {
@@ -981,7 +1076,7 @@ mod tests {
}
// AND: One final attempt after exceeding max retries
let result = info.responses(&spec).unwrap();
let result = info.responses(da_checker, spec).unwrap();
// THEN: Should fail with exceeded_retries = true
assert!(result.is_err());
@@ -991,8 +1086,16 @@ mod tests {
exceeded_retries,
}) = result
{
assert_eq!(faulty_peers.len(), 1); // column 2 missing
assert_eq!(faulty_peers[0].0, 2); // column index 2
// All columns except the first one should be faulty
let expected_faulty_count = expected_sampling_columns.len() - 1;
assert_eq!(faulty_peers.len(), expected_faulty_count);
let mut faulty_peers = faulty_peers.into_iter().collect::<HashMap<u64, PeerId>>();
// Only the columns that failed (indices 1..N) should be in faulty_peers
for column in &expected_sampling_columns[1..] {
faulty_peers.remove(column);
}
assert!(faulty_peers.is_empty());
assert!(exceeded_retries); // Should be true after max retries
} else {
panic!("Expected PeerFailure error with exceeded_retries=true");