mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-21 23:08:23 +00:00
Gloas serve envelope rpc (#8896)
Serves envelope by range and by root requests. Added PayloadEnvelopeStreamer so that we dont need to alter upstream code when we introduce blinded payload envelopes. Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com> Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu> Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
@@ -0,0 +1,42 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(test)]
|
||||
use mockall::automock;
|
||||
use task_executor::TaskExecutor;
|
||||
use types::{Hash256, SignedExecutionPayloadEnvelope, Slot};
|
||||
|
||||
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
|
||||
/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing envelope streamer logic.
|
||||
pub(crate) struct EnvelopeStreamerBeaconAdapter<T: BeaconChainTypes> {
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
}
|
||||
|
||||
#[cfg_attr(test, automock, allow(dead_code))]
|
||||
impl<T: BeaconChainTypes> EnvelopeStreamerBeaconAdapter<T> {
|
||||
pub(crate) fn new(chain: Arc<BeaconChain<T>>) -> Self {
|
||||
Self { chain }
|
||||
}
|
||||
|
||||
pub(crate) fn executor(&self) -> &TaskExecutor {
|
||||
&self.chain.task_executor
|
||||
}
|
||||
|
||||
pub(crate) fn get_payload_envelope(
|
||||
&self,
|
||||
root: &Hash256,
|
||||
) -> Result<Option<SignedExecutionPayloadEnvelope<T::EthSpec>>, store::Error> {
|
||||
self.chain.store.get_payload_envelope(root)
|
||||
}
|
||||
|
||||
pub(crate) fn get_split_slot(&self) -> Slot {
|
||||
self.chain.store.get_split_info().slot
|
||||
}
|
||||
|
||||
pub(crate) fn block_has_canonical_payload(
|
||||
&self,
|
||||
root: &Hash256,
|
||||
) -> Result<bool, BeaconChainError> {
|
||||
self.chain.canonical_head.block_has_canonical_payload(root)
|
||||
}
|
||||
}
|
||||
219
beacon_node/beacon_chain/src/payload_envelope_streamer/mod.rs
Normal file
219
beacon_node/beacon_chain/src/payload_envelope_streamer/mod.rs
Normal file
@@ -0,0 +1,219 @@
|
||||
mod beacon_chain_adapter;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg_attr(test, double)]
|
||||
use crate::payload_envelope_streamer::beacon_chain_adapter::EnvelopeStreamerBeaconAdapter;
|
||||
use futures::Stream;
|
||||
#[cfg(test)]
|
||||
use mockall_double::double;
|
||||
use tokio::sync::mpsc::{self, UnboundedSender};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tracing::{debug, error, warn};
|
||||
use types::{EthSpec, Hash256, SignedExecutionPayloadEnvelope};
|
||||
|
||||
#[cfg(not(test))]
|
||||
use crate::BeaconChain;
|
||||
use crate::{BeaconChainError, BeaconChainTypes};
|
||||
|
||||
type PayloadEnvelopeResult<E> =
|
||||
Result<Option<Arc<SignedExecutionPayloadEnvelope<E>>>, BeaconChainError>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
BlockMissingFromForkChoice,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum EnvelopeRequestSource {
|
||||
ByRoot,
|
||||
ByRange,
|
||||
}
|
||||
|
||||
pub struct PayloadEnvelopeStreamer<T: BeaconChainTypes> {
|
||||
adapter: EnvelopeStreamerBeaconAdapter<T>,
|
||||
request_source: EnvelopeRequestSource,
|
||||
}
|
||||
|
||||
// TODO(gloas) eventually we'll need to expand this to support loading blinded payload envelopes from the db
|
||||
// and fetching the execution payload from the EL. See BlockStreamer impl as an example
|
||||
impl<T: BeaconChainTypes> PayloadEnvelopeStreamer<T> {
|
||||
pub(crate) fn new(
|
||||
adapter: EnvelopeStreamerBeaconAdapter<T>,
|
||||
request_source: EnvelopeRequestSource,
|
||||
) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
adapter,
|
||||
request_source,
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(gloas) simply a stub impl for now. Should check some exec payload envelope cache
|
||||
// and return the envelope if it exists in the cache
|
||||
fn check_payload_envelope_cache(
|
||||
&self,
|
||||
_beacon_block_root: &Hash256,
|
||||
) -> Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>> {
|
||||
// if self.check_caches == CheckCaches::Yes
|
||||
None
|
||||
}
|
||||
|
||||
fn load_envelope(
|
||||
self: &Arc<Self>,
|
||||
beacon_block_root: &Hash256,
|
||||
) -> Result<Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>, BeaconChainError> {
|
||||
if let Some(cached_envelope) = self.check_payload_envelope_cache(beacon_block_root) {
|
||||
Ok(Some(cached_envelope))
|
||||
} else {
|
||||
// TODO(gloas) we'll want to use the execution layer directly to call
|
||||
// the engine api method eth_getPayloadBodiesByRange()
|
||||
match self.adapter.get_payload_envelope(beacon_block_root) {
|
||||
Ok(opt_envelope) => Ok(opt_envelope.map(Arc::new)),
|
||||
Err(e) => Err(BeaconChainError::DBError(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_envelopes(
|
||||
self: &Arc<Self>,
|
||||
block_roots: &[Hash256],
|
||||
) -> Result<Vec<(Hash256, PayloadEnvelopeResult<T::EthSpec>)>, BeaconChainError> {
|
||||
let streamer = self.clone();
|
||||
let block_roots = block_roots.to_vec();
|
||||
let split_slot = streamer.adapter.get_split_slot();
|
||||
// Loading from the DB is slow -> spawn a blocking task
|
||||
self.adapter
|
||||
.executor()
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let mut results: Vec<(Hash256, PayloadEnvelopeResult<T::EthSpec>)> = Vec::new();
|
||||
for root in block_roots.iter() {
|
||||
// TODO(gloas) we are loading the full envelope from the db.
|
||||
// in a future PR we will only be storing the blinded envelope.
|
||||
// When that happens we'll need to use the EL here to fetch
|
||||
// the payload and reconstruct the non-blinded envelope.
|
||||
let opt_envelope = match streamer.load_envelope(root) {
|
||||
Ok(opt_envelope) => opt_envelope,
|
||||
Err(e) => {
|
||||
results.push((*root, Err(e)));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if streamer.request_source == EnvelopeRequestSource::ByRoot {
|
||||
// No envelope verification required for `ENVELOPE_BY_ROOT` requests.
|
||||
// If we only served envelopes that match our canonical view, nodes
|
||||
// wouldn't be able to sync other branches.
|
||||
results.push((*root, Ok(opt_envelope)));
|
||||
continue;
|
||||
}
|
||||
|
||||
// When loading envelopes on or after the split slot, we must cross reference the bid from the child beacon block.
|
||||
// There can be payloads that have been imported into the hot db but don't match our current view
|
||||
// of the canonical chain.
|
||||
|
||||
if let Some(envelope) = opt_envelope {
|
||||
// Ensure that the envelopes we're serving match our view of the canonical chain.
|
||||
|
||||
// When loading envelopes before the split slot, there is no need to check.
|
||||
// Non-canonical payload envelopes will have already been pruned.
|
||||
if split_slot > envelope.slot() {
|
||||
results.push((*root, Ok(Some(envelope))));
|
||||
continue;
|
||||
}
|
||||
|
||||
match streamer.adapter.block_has_canonical_payload(root) {
|
||||
Ok(is_envelope_canonical) => {
|
||||
if is_envelope_canonical {
|
||||
results.push((*root, Ok(Some(envelope))));
|
||||
} else {
|
||||
results.push((*root, Ok(None)));
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
results.push((
|
||||
*root,
|
||||
Err(BeaconChainError::EnvelopeStreamerError(
|
||||
Error::BlockMissingFromForkChoice,
|
||||
)),
|
||||
));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
results.push((*root, Ok(None)));
|
||||
}
|
||||
}
|
||||
results
|
||||
},
|
||||
"load_execution_payload_envelopes",
|
||||
)
|
||||
.ok_or(BeaconChainError::RuntimeShutdown)?
|
||||
.await
|
||||
.map_err(BeaconChainError::TokioJoin)
|
||||
}
|
||||
|
||||
async fn stream_payload_envelopes(
|
||||
self: Arc<Self>,
|
||||
beacon_block_roots: Vec<Hash256>,
|
||||
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)>,
|
||||
) {
|
||||
let results = match self.load_envelopes(&beacon_block_roots).await {
|
||||
Ok(results) => results,
|
||||
Err(e) => {
|
||||
warn!(error = ?e, "Failed to load payload envelopes");
|
||||
send_errors(&beacon_block_roots, sender, e).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
for (root, result) in results {
|
||||
if sender.send((root, Arc::new(result))).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn launch_stream(
|
||||
self: Arc<Self>,
|
||||
block_roots: Vec<Hash256>,
|
||||
) -> impl Stream<Item = (Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)> {
|
||||
let (envelope_tx, envelope_rx) = mpsc::unbounded_channel();
|
||||
debug!(
|
||||
envelopes = block_roots.len(),
|
||||
"Launching a PayloadEnvelopeStreamer"
|
||||
);
|
||||
let executor = self.adapter.executor().clone();
|
||||
executor.spawn(
|
||||
self.stream_payload_envelopes(block_roots, envelope_tx),
|
||||
"get_payload_envelopes_sender",
|
||||
);
|
||||
UnboundedReceiverStream::new(envelope_rx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a `PayloadEnvelopeStreamer` from a `BeaconChain` and launch a stream.
|
||||
#[cfg(not(test))]
|
||||
pub fn launch_payload_envelope_stream<T: BeaconChainTypes>(
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
block_roots: Vec<Hash256>,
|
||||
request_source: EnvelopeRequestSource,
|
||||
) -> impl Stream<Item = (Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)> {
|
||||
let adapter = beacon_chain_adapter::EnvelopeStreamerBeaconAdapter::new(chain);
|
||||
PayloadEnvelopeStreamer::new(adapter, request_source).launch_stream(block_roots)
|
||||
}
|
||||
|
||||
async fn send_errors<E: EthSpec>(
|
||||
block_roots: &[Hash256],
|
||||
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<E>>)>,
|
||||
beacon_chain_error: BeaconChainError,
|
||||
) {
|
||||
let result = Arc::new(Err(beacon_chain_error));
|
||||
for beacon_block_root in block_roots {
|
||||
if sender.send((*beacon_block_root, result.clone())).is_err() {
|
||||
error!("EnvelopeStreamer channel closed unexpectedly");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
386
beacon_node/beacon_chain/src/payload_envelope_streamer/tests.rs
Normal file
386
beacon_node/beacon_chain/src/payload_envelope_streamer/tests.rs
Normal file
@@ -0,0 +1,386 @@
|
||||
use super::*;
|
||||
use crate::payload_envelope_streamer::beacon_chain_adapter::MockEnvelopeStreamerBeaconAdapter;
|
||||
use crate::test_utils::EphemeralHarnessType;
|
||||
use bls::{FixedBytesExtended, Signature};
|
||||
use futures::StreamExt;
|
||||
use std::collections::HashMap;
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
use types::{
|
||||
ExecutionBlockHash, ExecutionPayloadEnvelope, ExecutionPayloadGloas, Hash256, MinimalEthSpec,
|
||||
SignedExecutionPayloadEnvelope, Slot,
|
||||
};
|
||||
|
||||
type E = MinimalEthSpec;
|
||||
type T = EphemeralHarnessType<E>;
|
||||
|
||||
struct SlotEntry {
|
||||
block_root: Hash256,
|
||||
slot: Slot,
|
||||
envelope: Option<SignedExecutionPayloadEnvelope<E>>,
|
||||
non_canonical_envelope: bool,
|
||||
}
|
||||
|
||||
impl SlotEntry {
|
||||
fn expect_envelope(&self, split_slot: Option<Slot>) -> bool {
|
||||
if self.envelope.is_none() {
|
||||
return false;
|
||||
}
|
||||
if !self.non_canonical_envelope {
|
||||
return true;
|
||||
}
|
||||
// Non-canonical envelopes before the split slot are returned
|
||||
// (in production they would have been pruned).
|
||||
split_slot.is_some_and(|s| self.slot < s)
|
||||
}
|
||||
}
|
||||
|
||||
fn roots(chain: &[SlotEntry]) -> Vec<Hash256> {
|
||||
chain.iter().map(|s| s.block_root).collect()
|
||||
}
|
||||
|
||||
/// Build test chain data.
|
||||
fn build_chain(
|
||||
num_slots: u64,
|
||||
skipped_slots: &[u64],
|
||||
missing_envelope_slots: &[u64],
|
||||
non_canonical_envelope_slots: &[u64],
|
||||
) -> Vec<SlotEntry> {
|
||||
let mut chain = Vec::new();
|
||||
for i in 1..=num_slots {
|
||||
if skipped_slots.contains(&i) {
|
||||
continue;
|
||||
}
|
||||
let slot = Slot::new(i);
|
||||
let block_root = Hash256::from_low_u64_be(i);
|
||||
let has_envelope = !missing_envelope_slots.contains(&i);
|
||||
let is_non_canonical = non_canonical_envelope_slots.contains(&i);
|
||||
|
||||
let envelope = if has_envelope {
|
||||
let block_hash = if is_non_canonical {
|
||||
ExecutionBlockHash::from_root(Hash256::repeat_byte(0xFF))
|
||||
} else {
|
||||
ExecutionBlockHash::from_root(Hash256::from_low_u64_be(i))
|
||||
};
|
||||
Some(SignedExecutionPayloadEnvelope {
|
||||
message: ExecutionPayloadEnvelope {
|
||||
payload: ExecutionPayloadGloas {
|
||||
block_hash,
|
||||
..Default::default()
|
||||
},
|
||||
execution_requests: Default::default(),
|
||||
builder_index: 0,
|
||||
beacon_block_root: block_root,
|
||||
slot,
|
||||
state_root: Hash256::zero(),
|
||||
},
|
||||
signature: Signature::empty(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
chain.push(SlotEntry {
|
||||
block_root,
|
||||
slot,
|
||||
envelope,
|
||||
non_canonical_envelope: is_non_canonical,
|
||||
});
|
||||
}
|
||||
chain
|
||||
}
|
||||
|
||||
fn mock_adapter() -> (MockEnvelopeStreamerBeaconAdapter<T>, TestRuntime) {
|
||||
let runtime = TestRuntime::default();
|
||||
let mut mock = MockEnvelopeStreamerBeaconAdapter::default();
|
||||
mock.expect_executor()
|
||||
.return_const(runtime.task_executor.clone());
|
||||
(mock, runtime)
|
||||
}
|
||||
|
||||
/// Configure `get_payload_envelope` to return envelopes from chain data.
|
||||
fn mock_envelopes(mock: &mut MockEnvelopeStreamerBeaconAdapter<T>, chain: &[SlotEntry]) {
|
||||
let envelope_map: HashMap<Hash256, Option<SignedExecutionPayloadEnvelope<E>>> = chain
|
||||
.iter()
|
||||
.map(|entry| (entry.block_root, entry.envelope.clone()))
|
||||
.collect();
|
||||
mock.expect_get_payload_envelope()
|
||||
.returning(move |root| Ok(envelope_map.get(root).cloned().flatten()));
|
||||
}
|
||||
|
||||
/// Configure `block_has_canonical_payload` based on chain's non-canonical entries.
|
||||
fn mock_canonical_head(mock: &mut MockEnvelopeStreamerBeaconAdapter<T>, chain: &[SlotEntry]) {
|
||||
let non_canonical: Vec<Hash256> = chain
|
||||
.iter()
|
||||
.filter(|e| e.non_canonical_envelope)
|
||||
.map(|e| e.block_root)
|
||||
.collect();
|
||||
mock.expect_block_has_canonical_payload()
|
||||
.returning(move |root| Ok(!non_canonical.contains(root)));
|
||||
}
|
||||
|
||||
fn unwrap_result(
|
||||
result: &Arc<PayloadEnvelopeResult<E>>,
|
||||
) -> &Option<Arc<SignedExecutionPayloadEnvelope<E>>> {
|
||||
result
|
||||
.as_ref()
|
||||
.as_ref()
|
||||
.expect("unexpected error in stream result")
|
||||
}
|
||||
|
||||
async fn assert_stream_matches(
|
||||
stream: &mut (impl Stream<Item = (Hash256, Arc<PayloadEnvelopeResult<E>>)> + Unpin),
|
||||
chain: &[SlotEntry],
|
||||
split_slot: Option<Slot>,
|
||||
) {
|
||||
for (i, entry) in chain.iter().enumerate() {
|
||||
let (root, result) = stream
|
||||
.next()
|
||||
.await
|
||||
.unwrap_or_else(|| panic!("stream ended early at index {i}"));
|
||||
assert_eq!(root, entry.block_root, "root mismatch at index {i}");
|
||||
|
||||
let result = unwrap_result(&result);
|
||||
|
||||
if entry.expect_envelope(split_slot) {
|
||||
let envelope = result
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| panic!("expected Some at index {i} but got None"));
|
||||
let expected_envelope = entry.envelope.as_ref().unwrap();
|
||||
assert_eq!(
|
||||
envelope.block_hash(),
|
||||
expected_envelope.block_hash(),
|
||||
"block_hash mismatch at index {i}"
|
||||
);
|
||||
} else {
|
||||
assert!(
|
||||
result.is_none(),
|
||||
"expected None at index {i} (missing or non-canonical), got Some"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
assert!(stream.next().await.is_none(), "stream should be exhausted");
|
||||
}
|
||||
|
||||
/// Happy path: all envelopes exist and are canonical.
|
||||
#[tokio::test]
|
||||
async fn stream_envelopes_by_range() {
|
||||
let chain = build_chain(8, &[], &[], &[]);
|
||||
let (mut mock, _runtime) = mock_adapter();
|
||||
mock.expect_get_split_slot().return_const(Slot::new(0));
|
||||
mock_envelopes(&mut mock, &chain);
|
||||
mock_canonical_head(&mut mock, &chain);
|
||||
|
||||
let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange);
|
||||
let mut stream = streamer.launch_stream(roots(&chain));
|
||||
assert_stream_matches(&mut stream, &chain, None).await;
|
||||
}
|
||||
|
||||
/// Mixed chain: skipped slots, missing envelopes, and non-canonical envelopes.
|
||||
#[tokio::test]
|
||||
async fn stream_envelopes_by_range_mixed() {
|
||||
let chain = build_chain(12, &[3, 8], &[5], &[7, 11]);
|
||||
let (mut mock, _runtime) = mock_adapter();
|
||||
mock.expect_get_split_slot().return_const(Slot::new(0));
|
||||
mock_envelopes(&mut mock, &chain);
|
||||
mock_canonical_head(&mut mock, &chain);
|
||||
|
||||
let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange);
|
||||
let mut stream = streamer.launch_stream(roots(&chain));
|
||||
assert_stream_matches(&mut stream, &chain, None).await;
|
||||
}
|
||||
|
||||
/// Non-canonical envelopes before the split slot bypass canonical verification
|
||||
/// and are returned. Non-canonical envelopes after the split slot are filtered out.
|
||||
#[tokio::test]
|
||||
async fn stream_envelopes_by_range_before_split() {
|
||||
// Non-canonical envelopes at slots 2 and 4 (before split), slot 8 (after split).
|
||||
let chain = build_chain(10, &[], &[], &[2, 4, 8]);
|
||||
let split_slot = Slot::new(6);
|
||||
let (mut mock, _runtime) = mock_adapter();
|
||||
mock.expect_get_split_slot().return_const(split_slot);
|
||||
mock_envelopes(&mut mock, &chain);
|
||||
mock_canonical_head(&mut mock, &chain);
|
||||
|
||||
let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange);
|
||||
let mut stream = streamer.launch_stream(roots(&chain));
|
||||
assert_stream_matches(&mut stream, &chain, Some(split_slot)).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stream_envelopes_empty_roots() {
|
||||
let (mut mock, _runtime) = mock_adapter();
|
||||
mock.expect_get_split_slot().return_const(Slot::new(0));
|
||||
|
||||
let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange);
|
||||
let mut stream = streamer.launch_stream(vec![]);
|
||||
assert!(
|
||||
stream.next().await.is_none(),
|
||||
"empty roots should produce no results"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stream_envelopes_single_root() {
|
||||
let chain = build_chain(3, &[], &[], &[]);
|
||||
let (mut mock, _runtime) = mock_adapter();
|
||||
mock.expect_get_split_slot().return_const(Slot::new(0));
|
||||
mock_envelopes(&mut mock, &chain);
|
||||
mock_canonical_head(&mut mock, &chain);
|
||||
|
||||
let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange);
|
||||
let mut stream = streamer.launch_stream(vec![chain[1].block_root]);
|
||||
|
||||
let (root, result) = stream.next().await.expect("should get one result");
|
||||
assert_eq!(root, chain[1].block_root);
|
||||
let envelope = unwrap_result(&result)
|
||||
.as_ref()
|
||||
.expect("should have envelope");
|
||||
assert_eq!(
|
||||
envelope.block_hash(),
|
||||
chain[1].envelope.as_ref().unwrap().block_hash(),
|
||||
);
|
||||
|
||||
assert!(stream.next().await.is_none(), "stream should be exhausted");
|
||||
}
|
||||
|
||||
/// ByRoot requests skip canonical verification, so non-canonical envelopes
|
||||
/// should still be returned. `block_has_canonical_payload` should never be called.
|
||||
#[tokio::test]
|
||||
async fn stream_envelopes_by_root() {
|
||||
let chain = build_chain(8, &[], &[], &[3, 5, 7]);
|
||||
let (mut mock, _runtime) = mock_adapter();
|
||||
mock.expect_get_split_slot().return_const(Slot::new(0));
|
||||
mock_envelopes(&mut mock, &chain);
|
||||
mock.expect_block_has_canonical_payload().times(0);
|
||||
|
||||
let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRoot);
|
||||
let mut stream = streamer.launch_stream(roots(&chain));
|
||||
|
||||
// Every envelope should come back as Some, even the non-canonical ones.
|
||||
for (i, entry) in chain.iter().enumerate() {
|
||||
let (root, result) = stream
|
||||
.next()
|
||||
.await
|
||||
.unwrap_or_else(|| panic!("stream ended early at index {i}"));
|
||||
assert_eq!(root, entry.block_root, "root mismatch at index {i}");
|
||||
|
||||
let envelope = unwrap_result(&result)
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| panic!("expected Some at index {i} for ByRoot request"));
|
||||
let expected_envelope = entry.envelope.as_ref().unwrap();
|
||||
assert_eq!(
|
||||
envelope.block_hash(),
|
||||
expected_envelope.block_hash(),
|
||||
"block_hash mismatch at index {i}"
|
||||
);
|
||||
}
|
||||
|
||||
assert!(stream.next().await.is_none(), "stream should be exhausted");
|
||||
}
|
||||
|
||||
/// When `block_has_canonical_payload` returns an error, the streamer should
|
||||
/// yield `Err(EnvelopeStreamerError(BlockMissingFromForkChoice))` for those roots.
|
||||
#[tokio::test]
|
||||
async fn stream_envelopes_error() {
|
||||
let chain = build_chain(4, &[], &[], &[]);
|
||||
let (mut mock, _runtime) = mock_adapter();
|
||||
mock.expect_get_split_slot().return_const(Slot::new(0));
|
||||
mock_envelopes(&mut mock, &chain);
|
||||
mock.expect_block_has_canonical_payload()
|
||||
.returning(|_| Err(BeaconChainError::CanonicalHeadLockTimeout));
|
||||
|
||||
let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange);
|
||||
let mut stream = streamer.launch_stream(roots(&chain));
|
||||
|
||||
for (i, entry) in chain.iter().enumerate() {
|
||||
let (root, result) = stream
|
||||
.next()
|
||||
.await
|
||||
.unwrap_or_else(|| panic!("stream ended early at index {i}"));
|
||||
assert_eq!(root, entry.block_root, "root mismatch at index {i}");
|
||||
assert!(
|
||||
matches!(
|
||||
result.as_ref(),
|
||||
Err(BeaconChainError::EnvelopeStreamerError(
|
||||
Error::BlockMissingFromForkChoice
|
||||
))
|
||||
),
|
||||
"expected BlockMissingFromForkChoice error at index {i}, got {:?}",
|
||||
result
|
||||
);
|
||||
}
|
||||
|
||||
assert!(stream.next().await.is_none(), "stream should be exhausted");
|
||||
}
|
||||
|
||||
/// Requesting unknown roots (not in the store) via ByRange should return Ok(None).
|
||||
#[tokio::test]
|
||||
async fn stream_envelopes_by_range_unknown_roots() {
|
||||
let (mut mock, _runtime) = mock_adapter();
|
||||
mock.expect_get_split_slot().return_const(Slot::new(0));
|
||||
mock.expect_get_payload_envelope().returning(|_| Ok(None));
|
||||
|
||||
let unknown_roots: Vec<Hash256> = (1..=4)
|
||||
.map(|i| Hash256::from_low_u64_be(i * 1000))
|
||||
.collect();
|
||||
|
||||
let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange);
|
||||
let mut stream = streamer.launch_stream(unknown_roots.clone());
|
||||
|
||||
for (i, expected_root) in unknown_roots.iter().enumerate() {
|
||||
let (root, result) = stream
|
||||
.next()
|
||||
.await
|
||||
.unwrap_or_else(|| panic!("stream ended early at index {i}"));
|
||||
assert_eq!(root, *expected_root, "root mismatch at index {i}");
|
||||
let envelope = unwrap_result(&result);
|
||||
assert!(
|
||||
envelope.is_none(),
|
||||
"expected None for unknown root at index {i}"
|
||||
);
|
||||
}
|
||||
|
||||
assert!(stream.next().await.is_none(), "stream should be exhausted");
|
||||
}
|
||||
|
||||
/// Requesting roots via ByRoot where some envelopes are missing should
|
||||
/// return Ok(None) for those roots.
|
||||
#[tokio::test]
|
||||
async fn stream_envelopes_by_root_missing_envelopes() {
|
||||
let chain = build_chain(6, &[], &[2, 4], &[]);
|
||||
let (mut mock, _runtime) = mock_adapter();
|
||||
mock.expect_get_split_slot().return_const(Slot::new(0));
|
||||
mock_envelopes(&mut mock, &chain);
|
||||
mock.expect_block_has_canonical_payload().times(0);
|
||||
|
||||
let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRoot);
|
||||
let mut stream = streamer.launch_stream(roots(&chain));
|
||||
|
||||
for (i, entry) in chain.iter().enumerate() {
|
||||
let (root, result) = stream
|
||||
.next()
|
||||
.await
|
||||
.unwrap_or_else(|| panic!("stream ended early at index {i}"));
|
||||
assert_eq!(root, entry.block_root, "root mismatch at index {i}");
|
||||
|
||||
let envelope_opt = unwrap_result(&result);
|
||||
if let Some(entry_envelope) = &entry.envelope {
|
||||
let envelope = envelope_opt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| panic!("expected Some at index {i}"));
|
||||
assert_eq!(
|
||||
envelope.block_hash(),
|
||||
entry_envelope.block_hash(),
|
||||
"block_hash mismatch at index {i}"
|
||||
);
|
||||
} else {
|
||||
assert!(
|
||||
envelope_opt.is_none(),
|
||||
"expected None for missing envelope at index {i}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
assert!(stream.next().await.is_none(), "stream should be exhausted");
|
||||
}
|
||||
Reference in New Issue
Block a user