From 2c76ee5b6b03cdcd43563e89d1befa7f07f4cc75 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 20 May 2026 06:56:49 -0600 Subject: [PATCH 1/3] Gloas lookup sync boilerplate (#9322) Implements the boring boilerplate to send envelopes by root requests and process them. Pre-step to - https://github.com/sigp/lighthouse/pull/9155 Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 + beacon_node/beacon_processor/src/lib.rs | 10 ++ .../src/scheduler/work_queue.rs | 6 + .../src/service/api_types.rs | 2 + .../src/network_beacon_processor/mod.rs | 19 +++ .../network_beacon_processor/sync_methods.rs | 57 +++++++ beacon_node/network/src/router.rs | 35 ++++- .../network/src/sync/block_lookups/mod.rs | 2 + beacon_node/network/src/sync/manager.rs | 66 +++++++- .../network/src/sync/network_context.rs | 145 +++++++++++++++++- .../src/sync/network_context/requests.rs | 4 + .../requests/payload_envelopes_by_root.rs | 54 +++++++ 12 files changed, 398 insertions(+), 8 deletions(-) create mode 100644 beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index af8cd477d6..f3f6cd299e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6339,6 +6339,12 @@ impl BeaconChain { .contains_block(root) } + pub fn envelope_is_known_to_fork_choice(&self, root: &Hash256) -> bool { + self.canonical_head + .fork_choice_read_lock() + .is_payload_received(root) + } + /// Determines the beacon proposer for the next slot. If that proposer is registered in the /// `execution_layer`, provide the `execution_layer` with the necessary information to produce /// `PayloadAttributes` for future calls to fork choice. diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 25944bcf8a..ce3851ea54 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -418,6 +418,7 @@ pub enum Work { process_fn: AsyncFn, }, RpcCustodyColumn(AsyncFn), + RpcEnvelope(AsyncFn), ColumnReconstruction(AsyncFn), IgnoredRpcBlock { process_fn: BlockingFn, @@ -485,6 +486,7 @@ pub enum WorkType { RpcBlock, RpcBlobs, RpcCustodyColumn, + RpcEnvelope, ColumnReconstruction, IgnoredRpcBlock, ChainSegment, @@ -548,6 +550,7 @@ impl Work { Work::RpcBlock { .. } => WorkType::RpcBlock, Work::RpcBlobs { .. } => WorkType::RpcBlobs, Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, + Work::RpcEnvelope(_) => WorkType::RpcEnvelope, Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, Work::ChainSegment { .. } => WorkType::ChainSegment, @@ -825,6 +828,8 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = work_queues.rpc_custody_column_queue.pop() { Some(item) + } else if let Some(item) = work_queues.rpc_envelope_queue.pop() { + Some(item) // Check delayed blocks before gossip blocks, the gossip blocks might rely // on the delayed ones. } else if let Some(item) = work_queues.delayed_block_queue.pop() { @@ -1192,6 +1197,9 @@ impl BeaconProcessor { work_queues.rpc_block_queue.push(work, work_id) } Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id), + Work::RpcEnvelope(_) => { + work_queues.rpc_envelope_queue.push(work, work_id) + } Work::RpcCustodyColumn { .. } => { work_queues.rpc_custody_column_queue.push(work, work_id) } @@ -1330,6 +1338,7 @@ impl BeaconProcessor { WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => { work_queues.rpc_blob_queue.len() } + WorkType::RpcEnvelope => work_queues.rpc_envelope_queue.len(), WorkType::RpcCustodyColumn => work_queues.rpc_custody_column_queue.len(), WorkType::ColumnReconstruction => { work_queues.column_reconstruction_queue.len() @@ -1523,6 +1532,7 @@ impl BeaconProcessor { } | Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) + | Work::RpcEnvelope(process_fn) | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index eb57b97df2..2fdc15182c 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -120,6 +120,7 @@ pub struct BeaconProcessorQueueLengths { rpc_block_queue: usize, rpc_blob_queue: usize, rpc_custody_column_queue: usize, + rpc_envelope_queue: usize, column_reconstruction_queue: usize, chain_segment_queue: usize, backfill_chain_segment: usize, @@ -195,6 +196,8 @@ impl BeaconProcessorQueueLengths { // We don't request more than `PARENT_DEPTH_TOLERANCE` (32) lookups, so we can limit // this queue size. With 48 max blobs per block, each column sidecar list could be up to 12MB. rpc_custody_column_queue: 64, + // Bounded by `PARENT_DEPTH_TOLERANCE`; one envelope per Gloas block. + rpc_envelope_queue: 1024, column_reconstruction_queue: 1, chain_segment_queue: 64, backfill_chain_segment: 64, @@ -253,6 +256,7 @@ pub struct WorkQueues { pub rpc_block_queue: FifoQueue>, pub rpc_blob_queue: FifoQueue>, pub rpc_custody_column_queue: FifoQueue>, + pub rpc_envelope_queue: FifoQueue>, pub column_reconstruction_queue: LifoQueue>, pub chain_segment_queue: FifoQueue>, pub backfill_chain_segment: FifoQueue>, @@ -323,6 +327,7 @@ impl WorkQueues { let rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); let rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); let rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); + let rpc_envelope_queue = FifoQueue::new(queue_lengths.rpc_envelope_queue); let column_reconstruction_queue = LifoQueue::new(queue_lengths.column_reconstruction_queue); let chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); let backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); @@ -391,6 +396,7 @@ impl WorkQueues { rpc_block_queue, rpc_blob_queue, rpc_custody_column_queue, + rpc_envelope_queue, chain_segment_queue, column_reconstruction_queue, backfill_chain_segment, diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index f598f59aee..2429b813e9 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -23,6 +23,8 @@ pub enum SyncRequestId { SingleBlock { id: SingleLookupReqId }, /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, + /// Request searching for a payload envelope given a hash. + SinglePayloadEnvelope { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. DataColumnsByRoot(DataColumnsByRootRequestId), /// Blocks by range request diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7bf969db10..7817feb0bd 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -588,6 +588,25 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for an RPC-fetched payload envelope. `process_lookup_envelope` + /// reports the result back to sync. + pub fn send_lookup_envelope( + self: &Arc, + block_root: Hash256, + envelope: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let s = self.clone(); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcEnvelope(Box::pin(async move { + s.process_lookup_envelope(block_root, envelope, seen_timestamp, process_type) + .await; + })), + }) + } + /// Create a new `Work` event for some custody columns. `process_rpc_custody_columns` reports /// the result back to sync. pub fn send_rpc_custody_columns( diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 988a68c9dd..e3ba6fb3c4 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -426,6 +426,63 @@ impl NetworkBeaconProcessor { }); } + /// Attempt to verify and import an execution payload envelope received via RPC. + #[instrument( + name = "lh_process_lookup_envelope", + parent = None, + level = "debug", + skip_all, + fields(?block_root), + )] + pub async fn process_lookup_envelope( + self: Arc>, + block_root: Hash256, + envelope: Arc>, + _seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + debug!( + ?block_root, + slot = %envelope.slot(), + ?process_type, + "Processing RPC payload envelope" + ); + + // Gossip verification runs the same signature / slot / builder-index / block-hash checks + // independently of gossip propagation, so we can reuse it for RPC-fetched envelopes. + #[allow(clippy::result_large_err)] + let result = match self + .chain + .clone() + .verify_envelope_for_gossip(envelope.clone()) + .await + { + Ok(verified) => { + self.chain + .process_execution_payload_envelope( + block_root, + verified, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + || Ok(()), + ) + .await + } + Err(e) => Err(e), + }; + + // TODO(gloas): structured penalty classification arrives with the envelope lookup state + // machine; for now, fold the EnvelopeError into BlockError::InternalError so it flows + // through the existing `BlockProcessingResult::Err` path. + let result: Result = + result.map_err(|e| BlockError::InternalError(format!("envelope: {e}"))); + + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: result.into(), + }); + } + pub fn process_historic_data_columns( &self, batch_id: CustodyBackfillBatchId, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index a718997e0a..35939c6f39 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -26,6 +26,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, trace, warn}; use types::{ BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, }; /// Handles messages from the network and routes them to the appropriate service to be handled. @@ -348,10 +349,13 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } - // TODO(EIP-7732): implement outgoing payload envelopes by range and root - // responses once sync manager requests them. - Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by root and by range not supported yet"); + Response::PayloadEnvelopesByRoot(envelope) => { + self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); + } + // TODO(EIP-7732): implement outgoing payload envelopes by range responses + // once sync manager requests them. + Response::PayloadEnvelopesByRange(_) => { + debug!("Requesting envelopes by range not supported yet"); } // Lighthouse currently only serves BlocksByHead and does not issue it as a client, // so receiving a response is unexpected. Drop it without crashing. @@ -821,6 +825,29 @@ impl Router { } } + /// Handle a `PayloadEnvelopesByRoot` response from the peer. + pub fn on_payload_envelopes_by_root_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(id @ SyncRequestId::SinglePayloadEnvelope { .. }) => id, + other => { + crit!(request = ?other, %peer_id, "PayloadEnvelopesByRoot response on incorrect request"); + return; + } + }; + + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 3929f74aa0..f10610c751 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -559,6 +559,8 @@ impl BlockLookups { BlockProcessType::SingleCustodyColumn(id) => { self.on_processing_result_inner::>(id, result, cx) } + // TODO(gloas): route into the payload envelope lookup state machine. + BlockProcessType::SinglePayloadEnvelope(_) => Ok(LookupResult::Pending), }; self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 347b018a93..14a38f0e72 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -73,7 +73,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -132,6 +133,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// A payload envelope has been received from the RPC. + RpcPayloadEnvelope { + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), @@ -193,6 +202,7 @@ pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, SingleCustodyColumn(Id), + SinglePayloadEnvelope(Id), } impl BlockProcessType { @@ -200,7 +210,8 @@ impl BlockProcessType { match self { BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } - | BlockProcessType::SingleCustodyColumn(id) => *id, + | BlockProcessType::SingleCustodyColumn(id) + | BlockProcessType::SinglePayloadEnvelope(id) => *id, } } } @@ -502,6 +513,9 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_payload_envelope_response(id, peer_id, RpcEvent::RPCError(error)) + } SyncRequestId::DataColumnsByRoot(req_id) => { self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error)) } @@ -848,6 +862,17 @@ impl SyncManager { } => { self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp) } + SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp, + } => self.rpc_payload_envelope_received( + sync_request_id, + peer_id, + envelope, + seen_timestamp, + ), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -1209,6 +1234,27 @@ impl SyncManager { } } + // TODO(gloas): dispatch into block_lookups once the envelope lookup state machine lands. + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => self + .on_single_payload_envelope_response( + id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ), + _ => { + crit!(%peer_id, "bad request id for payload envelope"); + } + } + } + fn rpc_data_column_received( &mut self, sync_request_id: SyncRequestId, @@ -1237,6 +1283,22 @@ impl SyncManager { } } + fn on_single_payload_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(_resp) = self + .network + .on_single_payload_envelope_response(id, peer_id, envelope) + { + // TODO(gloas): dispatch into + // `block_lookups.on_download_response::>(...)` once + // the envelope lookup state machine lands. + } + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 465e23998b..9d5ac40c0a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,10 @@ //! channel and stores a global RPC ID to perform requests. use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; -pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; +pub use self::requests::{ + BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest, + PayloadEnvelopesByRootSingleRequest, +}; use super::SyncMessage; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; @@ -37,6 +40,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + PayloadEnvelopesByRootRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn}; use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -201,6 +205,9 @@ pub struct SyncNetworkContext { ActiveRequests>, /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRoot requests + payload_envelopes_by_root_requests: + ActiveRequests>, /// A mapping of active DataColumnsByRoot requests data_columns_by_root_requests: ActiveRequests>, @@ -294,6 +301,7 @@ impl SyncNetworkContext { request_id: 1, blocks_by_root_requests: ActiveRequests::new("blocks_by_root"), blobs_by_root_requests: ActiveRequests::new("blobs_by_root"), + payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"), data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"), blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), @@ -322,6 +330,7 @@ impl SyncNetworkContext { request_id: _, blocks_by_root_requests, blobs_by_root_requests, + payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, blobs_by_range_requests, @@ -345,6 +354,10 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SingleBlob { id: *id }); + let payload_envelopes_by_root_ids = payload_envelopes_by_root_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id }); let data_column_by_root_ids = data_columns_by_root_requests .active_requests_of_peer(peer_id) .into_iter() @@ -363,6 +376,7 @@ impl SyncNetworkContext { .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) + .chain(payload_envelopes_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) @@ -419,6 +433,7 @@ impl SyncNetworkContext { request_id: _, blocks_by_root_requests, blobs_by_root_requests, + payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, blobs_by_range_requests, @@ -441,6 +456,7 @@ impl SyncNetworkContext { for peer_id in blocks_by_root_requests .iter_request_peers() .chain(blobs_by_root_requests.iter_request_peers()) + .chain(payload_envelopes_by_root_requests.iter_request_peers()) .chain(data_columns_by_root_requests.iter_request_peers()) .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) @@ -927,6 +943,81 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(id.req_id)) } + /// Request a payload envelope for a block root via PayloadEnvelopesByRoot RPC. + #[allow(dead_code)] + pub fn payload_lookup_request( + &mut self, + lookup_id: SingleLookupId, + lookup_peers: Arc>>, + block_root: Hash256, + ) -> Result { + // Skip the download if fork-choice already saw this envelope (e.g. imported via gossip + // before the lookup got here). + if self.chain.envelope_is_known_to_fork_choice(&block_root) { + return Ok(LookupRequestResult::NoRequestNeeded( + "envelope already known to fork-choice", + )); + } + + let active_request_count_by_peer = self.active_request_count_by_peer(); + let Some(peer_id) = lookup_peers + .read() + .iter() + .map(|peer| { + ( + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) + else { + return Ok(LookupRequestResult::Pending("no peers")); + }; + + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; + + let request = PayloadEnvelopesByRootSingleRequest { block_root }; + + let network_request = RequestType::PayloadEnvelopesByRoot( + request + .clone() + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: network_request, + app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRoot", + ?block_root, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_root_requests.insert( + id, + peer_id, + // true = enforce that the peer returns a response. We only request a single envelope + // and the peer must have it. + true, + PayloadEnvelopesByRootRequestItems::new(request), + Span::none(), + ); + + Ok(LookupRequestResult::RequestSent(id.req_id)) + } + /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: /// - If we have a downloaded but not yet processed block /// - If the da_checker has a pending block @@ -1476,6 +1567,27 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } + pub(crate) fn on_single_payload_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>> { + let resp = self + .payload_envelopes_by_root_requests + .on_response(id, rpc_event); + let resp = resp.map(|res| { + res.and_then(|(mut envelopes, seen_timestamp)| { + match envelopes.pop() { + Some(envelope) => Ok((envelope, seen_timestamp)), + // Should never happen, we enforce at least 1 chunk. + None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()), + } + }) + }); + self.on_rpc_response_result(resp, peer_id) + } + #[allow(clippy::type_complexity)] pub(crate) fn on_data_columns_by_root_response( &mut self, @@ -1652,6 +1764,35 @@ impl SyncNetworkContext { }) } + #[allow(dead_code)] + pub fn send_payload_for_processing( + &self, + block_root: Hash256, + envelope: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), SendErrorProcessor> { + let beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!( + ?block_root, + ?process_type, + "Sending payload envelope for processing" + ); + + beacon_processor + .send_lookup_envelope(block_root, envelope, seen_timestamp, process_type) + .map_err(|e| { + error!( + error = ?e, + "Failed to send sync payload envelope to processor" + ); + SendErrorProcessor::SendError + }) + } + pub fn send_custody_columns_for_processing( &self, _id: Id, diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index ad60dffb45..8c091eca80 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -16,6 +16,9 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_root::{ + PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, +}; use crate::metrics; @@ -27,6 +30,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs new file mode 100644 index 0000000000..a142d86e90 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs @@ -0,0 +1,54 @@ +use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest; +use std::sync::Arc; +use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope}; + +use super::{ActiveRequestItems, LookupVerifyError}; + +#[derive(Debug, Clone)] +pub struct PayloadEnvelopesByRootSingleRequest { + pub block_root: Hash256, +} + +impl PayloadEnvelopesByRootSingleRequest { + pub fn into_request( + self, + fork_context: &ForkContext, + ) -> Result { + PayloadEnvelopesByRootRequest::new(vec![self.block_root], fork_context) + } +} + +pub struct PayloadEnvelopesByRootRequestItems { + request: PayloadEnvelopesByRootSingleRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRootRequestItems { + pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRootRequestItems { + type Item = Arc>; + + /// Append a response to the single chunk request. We expect exactly one envelope per + /// block root. Returns `true` when the single expected item has been received. + fn add(&mut self, envelope: Self::Item) -> Result { + let block_root = envelope.message.beacon_block_root; + if self.request.block_root != block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + } + + self.items.push(envelope); + // Always returns true, we expect a single envelope per block root + Ok(true) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} From a9637c16502abb0215b9a76b7067207b6bc70d8c Mon Sep 17 00:00:00 2001 From: Daniel Knopik <107140945+dknopik@users.noreply.github.com> Date: Thu, 21 May 2026 05:25:02 +0200 Subject: [PATCH 2/3] Partial columns cleanup (#9321) #8314 left a few ugly potentially panicking location behind - all of them believed to be unreachable, but this PR fixes them regardless for good hygiene. Update to `ethereum_ssz 0.10.4` for two new helpers: `not_inplace` and `clone_zeroed`. Remove remaining `expect` and `todo!` in favour of these helpers and one new fallible (but practically infallible) method. Co-Authored-By: Daniel Knopik --- Cargo.lock | 4 ++-- .../src/data_column_verification.rs | 11 ++++++++- beacon_node/http_api/src/publish_blocks.rs | 12 ++++++---- .../lighthouse_network/src/types/partial.rs | 18 +++++--------- .../gossip_methods.rs | 24 +++++++++++-------- .../types/src/data/data_column_sidecar.rs | 17 ++++++------- 6 files changed, 47 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 078f699f3c..d42bcd8fc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3282,9 +3282,9 @@ dependencies = [ [[package]] name = "ethereum_ssz" -version = "0.10.3" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368a4a4e4273b0135111fe9464e35465067766a8f664615b5a86338b73864407" +checksum = "e462875ad8693755ea8913d6e905715c76ea4836e2254e18c9cf0f7a8f8c2a13" dependencies = [ "alloy-primitives", "arbitrary", diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 71562b376b..45cd687b36 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -1220,7 +1220,16 @@ pub fn validate_partial_data_column_sidecar_for_gossip( header, }; } - Err(MissingCellsError::UnexpectedError(e)) => todo!("handle unexpected error {:?}", e), + Err(MissingCellsError::UnexpectedError(e)) => { + return PartialColumnVerificationResult::ErrWithValidHeader { + err: GossipDataColumnError::InternalError(format!( + "An unexpected error occurred while validating partial data columns: {:?}", + e + )) + .into(), + header, + }; + } }; // We do not have to check block related data here, as we create the verifiable column from diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index e96c86b17f..ca4ab85524 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -524,11 +524,15 @@ pub(crate) fn publish_column_sidecars( if chain.config.enable_partial_columns && let DataColumnSidecar::Fulu(fulu_data_col) = data_col.as_ref() { - let mut partial = fulu_data_col.to_partial(); - if let Some(header) = partial.sidecar.header.take() { - partial_header = Some(header); + match fulu_data_col.to_partial() { + Ok(mut partial) => { + if let Some(header) = partial.sidecar.header.take() { + partial_header = Some(header); + } + partial_columns.push(Arc::new(partial)); + } + Err(err) => crit!(?err, "Could not convert from full to partial"), } - partial_columns.push(Arc::new(partial)); } let subnet = DataColumnSubnetId::from_column_index(*data_col.index(), &chain.spec); diff --git a/beacon_node/lighthouse_network/src/types/partial.rs b/beacon_node/lighthouse_network/src/types/partial.rs index f25ce9ec36..26705b7106 100644 --- a/beacon_node/lighthouse_network/src/types/partial.rs +++ b/beacon_node/lighthouse_network/src/types/partial.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use tracing::{debug, error}; use types::core::{EthSpec, Hash256}; use types::data::{ - CellBitmap, PartialDataColumn, PartialDataColumnHeader, PartialDataColumnPartsMetadata, + PartialDataColumn, PartialDataColumnHeader, PartialDataColumnPartsMetadata, PartialDataColumnSidecar, PartialDataColumnSidecarRef, }; @@ -32,12 +32,8 @@ impl OutgoingPartialColumn { header_sent_set: HeaderSentSet, ) -> Self { // For now, always request all cells - let mut requests = partial_column.sidecar.cells_present_bitmap.clone(); - for idx in 0..requests.len() { - requests - .set(idx, true) - .expect("Bound asserted via `len` above"); - } + let mut requests = partial_column.sidecar.cells_present_bitmap.clone_zeroed(); + requests.not_inplace(); let metadata = PartialDataColumnPartsMetadata:: { available: partial_column.sidecar.cells_present_bitmap.clone(), requests, @@ -45,10 +41,7 @@ impl OutgoingPartialColumn { .into(); let header_message = PartialDataColumnSidecarRef { - cells_present_bitmap: CellBitmap::::with_capacity( - partial_column.sidecar.cells_present_bitmap.len(), - ) - .expect("Taking length from bitmap with same bound"), + cells_present_bitmap: partial_column.sidecar.cells_present_bitmap.clone_zeroed(), column: vec![], kzg_proofs: vec![], header: Some(header).into(), @@ -210,7 +203,7 @@ impl Partial for OutgoingPartialColumn { let send = self .partial_column .sidecar - .filter(|idx| want.get(idx).expect("Bound checked above")) + .filter(|idx| want.get(idx).unwrap_or(false)) .map_err(|err| { error!(?err, "Unexpected error filtering sidecar"); PartialError::InvalidFormat @@ -262,6 +255,7 @@ mod tests { use fixed_bytes::FixedBytesExtended; use libp2p::identity::Keypair; use ssz_types::FixedVector; + use types::CellBitmap; use types::block::{BeaconBlockHeader, SignedBeaconBlockHeader}; use types::core::{MinimalEthSpec, Slot}; use types::data::PartialDataColumnHeader; diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index d34668b138..7a902649cb 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1381,16 +1381,20 @@ impl NetworkBeaconProcessor { &[&data_column_index.to_string()], ); - let mut column = col.to_partial(); - let header = column.sidecar.header.take(); - if let Some(header) = header { - self.send_network_message(NetworkMessage::PublishPartialColumns { - columns: vec![Arc::new(column)], - header: Arc::new(header), - }); - } else { - crit!("Converting from full to partial yielded headerless partial") - }; + match col.to_partial() { + Ok(mut column) => { + let header = column.sidecar.header.take(); + if let Some(header) = header { + self.send_network_message(NetworkMessage::PublishPartialColumns { + columns: vec![Arc::new(column)], + header: Arc::new(header), + }); + } else { + crit!("Converting from full to partial yielded headerless partial") + }; + } + Err(err) => crit!(?err, "Could not convert from full to partial"), + } } let result = self diff --git a/consensus/types/src/data/data_column_sidecar.rs b/consensus/types/src/data/data_column_sidecar.rs index 170aa99666..d15651730f 100644 --- a/consensus/types/src/data/data_column_sidecar.rs +++ b/consensus/types/src/data/data_column_sidecar.rs @@ -250,19 +250,16 @@ impl DataColumnSidecarFulu { } /// Convert this full data column into a verifiable partial data column. - pub fn to_partial(&self) -> PartialDataColumn { + /// Note: This is not expected to ever fail. + pub fn to_partial(&self) -> Result, PartialDataColumnSidecarError> { let cell_count = self.column.len(); - let mut bitmap = - CellBitmap::::with_capacity(cell_count).expect("our column has the same bound"); - for idx in 0..cell_count { - bitmap - .set(idx, true) - .expect("The correct size is initialized right above"); - } + let mut bitmap = CellBitmap::::with_capacity(cell_count) + .map_err(|_| PartialDataColumnSidecarError::UnexpectedBounds)?; + bitmap.not_inplace(); let block_root = self.block_root(); - PartialDataColumn { + Ok(PartialDataColumn { block_root, index: self.index, sidecar: PartialDataColumnSidecar { @@ -276,7 +273,7 @@ impl DataColumnSidecarFulu { }) .into(), }, - } + }) } } From 1caaa10fa86cfe9ad47cffc03f7de81b3e6642e6 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 21 May 2026 02:35:35 -0600 Subject: [PATCH 3/3] Drop unused EthSpec generic from Stores (#9281) Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/beacon_chain/src/beacon_chain.rs | 4 +- .../src/beacon_fork_choice_store.rs | 12 ++-- beacon_node/beacon_chain/src/builder.rs | 19 +++---- .../overflow_lru_cache.rs | 10 ++-- beacon_node/beacon_chain/src/migrate.rs | 4 +- .../payload_attestation_verification/tests.rs | 2 +- .../beacon_chain/src/persisted_custody.rs | 4 +- beacon_node/beacon_chain/src/test_utils.rs | 18 +++--- .../beacon_chain/tests/op_verification.rs | 2 +- .../beacon_chain/tests/prepare_payload.rs | 8 +-- .../beacon_chain/tests/schema_stability.rs | 2 +- beacon_node/beacon_chain/tests/store_tests.rs | 14 ++--- beacon_node/client/src/builder.rs | 15 +++-- beacon_node/http_api/src/test_utils.rs | 2 +- .../src/network_beacon_processor/mod.rs | 3 +- beacon_node/network/src/persisted_dht.rs | 13 ++--- .../network/src/subnet_service/tests/mod.rs | 7 +-- beacon_node/network/src/sync/tests/mod.rs | 2 +- beacon_node/src/lib.rs | 2 +- beacon_node/store/src/database/interface.rs | 13 ++--- .../store/src/database/leveldb_impl.rs | 13 ++--- beacon_node/store/src/database/redb_impl.rs | 15 ++--- beacon_node/store/src/forwards_iter.rs | 18 +++--- beacon_node/store/src/hot_cold_store.rs | 16 +++--- beacon_node/store/src/invariants.rs | 2 +- beacon_node/store/src/iter.rs | 56 ++++++++----------- beacon_node/store/src/lib.rs | 8 +-- beacon_node/store/src/memory_store.rs | 12 ++-- beacon_node/store/src/reconstruct.rs | 4 +- consensus/fork_choice/tests/tests.rs | 2 +- database_manager/src/lib.rs | 22 ++++---- 31 files changed, 141 insertions(+), 183 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f3f6cd299e..2259e1d809 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -325,8 +325,8 @@ pub enum StateSkipConfig { } pub trait BeaconChainTypes: Send + Sync + 'static { - type HotStore: store::ItemStore; - type ColdStore: store::ItemStore; + type HotStore: store::ItemStore; + type ColdStore: store::ItemStore; type SlotClock: slot_clock::SlotClock; type EthSpec: types::EthSpec; } diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 95fde28f5b..133eaa2fc6 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -129,8 +129,8 @@ impl BalancesCache { /// Implements `fork_choice::ForkChoiceStore` in order to provide a persistent backing to the /// `fork_choice::ForkChoice` struct. #[derive(Debug, Educe)] -#[educe(PartialEq(bound(E: EthSpec, Hot: ItemStore, Cold: ItemStore)))] -pub struct BeaconForkChoiceStore, Cold: ItemStore> { +#[educe(PartialEq(bound(E: EthSpec, Hot: ItemStore, Cold: ItemStore)))] +pub struct BeaconForkChoiceStore { #[educe(PartialEq(ignore))] store: Arc>, balances_cache: BalancesCache, @@ -150,8 +150,8 @@ pub struct BeaconForkChoiceStore, Cold: ItemStore< impl BeaconForkChoiceStore where E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, + Hot: ItemStore, + Cold: ItemStore, { /// Initialize `Self` from some `anchor` checkpoint which may or may not be the genesis state. /// @@ -267,8 +267,8 @@ where impl ForkChoiceStore for BeaconForkChoiceStore where E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, + Hot: ItemStore, + Cold: ItemStore, { type Error = Error; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index e668bef7c0..61c026e0a9 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -60,8 +60,8 @@ pub struct Witness( impl BeaconChainTypes for Witness where - THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, TSlotClock: SlotClock + 'static, E: EthSpec + 'static, { @@ -115,8 +115,8 @@ pub struct BeaconChainBuilder { impl BeaconChainBuilder> where - THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, TSlotClock: SlotClock + 'static, E: EthSpec + 'static, { @@ -1162,8 +1162,8 @@ where impl BeaconChainBuilder> where - THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, E: EthSpec + 'static, { /// Sets the `BeaconChain` slot clock to `TestingSlotClock`. @@ -1301,11 +1301,8 @@ mod test { let validator_count = 1; let genesis_time = 13_371_337; - let store: HotColdDB< - MinimalEthSpec, - MemoryStore, - MemoryStore, - > = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal().into()).unwrap(); + let store: HotColdDB = + HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal().into()).unwrap(); let spec = MinimalEthSpec::default_spec(); let genesis_state = interop_genesis_state( diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 3034e196b9..8a80f835ab 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -802,7 +802,7 @@ mod test { fn get_store_with_spec( db_path: &TempDir, spec: Arc, - ) -> Arc, BeaconNodeBackend>> { + ) -> Arc> { let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); let blobs_path = db_path.path().join("blobs_db"); @@ -860,8 +860,8 @@ mod test { ) where E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, + Hot: ItemStore, + Cold: ItemStore, { let chain = &harness.chain; let head = chain.head_snapshot(); @@ -946,8 +946,8 @@ mod test { where E: EthSpec, T: BeaconChainTypes< - HotStore = BeaconNodeBackend, - ColdStore = BeaconNodeBackend, + HotStore = BeaconNodeBackend, + ColdStore = BeaconNodeBackend, EthSpec = E, >, { diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 3c17c1ebba..9c70bcafa2 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -30,7 +30,7 @@ pub const DEFAULT_EPOCHS_PER_MIGRATION: u64 = 1; /// The background migrator runs a thread to perform pruning and migrate state from the hot /// to the cold database. -pub struct BackgroundMigrator, Cold: ItemStore> { +pub struct BackgroundMigrator { db: Arc>, /// Record of when the last migration ran, for enforcing `epochs_per_migration`. prev_migration: Arc>, @@ -135,7 +135,7 @@ pub struct FinalizationNotification { pub prev_migration: Arc>, } -impl, Cold: ItemStore> BackgroundMigrator { +impl BackgroundMigrator { /// Create a new `BackgroundMigrator` and spawn its thread if necessary. pub fn new(db: Arc>, config: MigratorConfig) -> Self { // Estimate last migration run from DB split slot. diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs index 7faad98e55..c45df51ac8 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs @@ -43,7 +43,7 @@ struct TestContext { keypairs: Vec, spec: ChainSpec, genesis_block_root: Hash256, - store: Arc, store::MemoryStore>>, + store: Arc>, } impl TestContext { diff --git a/beacon_node/beacon_chain/src/persisted_custody.rs b/beacon_node/beacon_chain/src/persisted_custody.rs index ba221c67b5..cc7219fa90 100644 --- a/beacon_node/beacon_chain/src/persisted_custody.rs +++ b/beacon_node/beacon_chain/src/persisted_custody.rs @@ -9,7 +9,7 @@ pub const CUSTODY_DB_KEY: Hash256 = Hash256::ZERO; pub struct PersistedCustody(pub CustodyContextSsz); -pub fn load_custody_context, Cold: ItemStore>( +pub fn load_custody_context( store: Arc>, ) -> Option { let res: Result, _> = @@ -22,7 +22,7 @@ pub fn load_custody_context, Cold: ItemStore>( } /// Attempt to persist the custody context object to `self.store`. -pub fn persist_custody_context, Cold: ItemStore>( +pub fn persist_custody_context( store: Arc>, custody_context: CustodyContextSsz, ) -> Result<(), store::Error> { diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 8e9cc61208..c2ccad7d8c 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -124,8 +124,8 @@ pub fn get_kzg(spec: &ChainSpec) -> Arc { pub type BaseHarnessType = Witness; -pub type DiskHarnessType = BaseHarnessType, BeaconNodeBackend>; -pub type EphemeralHarnessType = BaseHarnessType, MemoryStore>; +pub type DiskHarnessType = BaseHarnessType; +pub type EphemeralHarnessType = BaseHarnessType; pub type BoxedMutator = Box< dyn FnOnce( @@ -334,7 +334,7 @@ impl Builder> { /// Manually restore from a given `MemoryStore`. pub fn resumed_ephemeral_store( mut self, - store: Arc, MemoryStore>>, + store: Arc>, ) -> Self { let mutator = move |builder: BeaconChainBuilder<_>| { builder @@ -350,7 +350,7 @@ impl Builder> { /// Disk store, start from genesis. pub fn fresh_disk_store( mut self, - store: Arc, BeaconNodeBackend>>, + store: Arc>, ) -> Self { let validator_keypairs = self .validator_keypairs @@ -384,7 +384,7 @@ impl Builder> { /// Disk store, resume. pub fn resumed_disk_store( mut self, - store: Arc, BeaconNodeBackend>>, + store: Arc>, ) -> Self { let mutator = move |builder: BeaconChainBuilder<_>| { builder @@ -399,8 +399,8 @@ impl Builder> { impl Builder> where E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, + Hot: ItemStore, + Cold: ItemStore, { pub fn new(eth_spec_instance: E) -> Self { let runtime = TestRuntime::default(); @@ -760,8 +760,8 @@ pub type HarnessSyncContributions = Vec<( impl BeaconChainHarness> where E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, + Hot: ItemStore, + Cold: ItemStore, { pub fn builder(eth_spec_instance: E) -> Builder> { create_test_tracing_subscriber(); diff --git a/beacon_node/beacon_chain/tests/op_verification.rs b/beacon_node/beacon_chain/tests/op_verification.rs index 2f97f10745..adc14541a9 100644 --- a/beacon_node/beacon_chain/tests/op_verification.rs +++ b/beacon_node/beacon_chain/tests/op_verification.rs @@ -27,7 +27,7 @@ static KEYPAIRS: LazyLock> = type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; -type HotColdDB = store::HotColdDB, BeaconNodeBackend>; +type HotColdDB = store::HotColdDB; fn get_store(db_path: &TempDir) -> Arc { let spec = Arc::new(test_spec::()); diff --git a/beacon_node/beacon_chain/tests/prepare_payload.rs b/beacon_node/beacon_chain/tests/prepare_payload.rs index 47dd1ef517..de8bfb3865 100644 --- a/beacon_node/beacon_chain/tests/prepare_payload.rs +++ b/beacon_node/beacon_chain/tests/prepare_payload.rs @@ -34,7 +34,7 @@ type TestHarness = BeaconChainHarness>; fn get_store( db_path: &TempDir, spec: Arc, -) -> Arc, BeaconNodeBackend>> { +) -> Arc> { let store_config = StoreConfig { prune_payloads: false, ..StoreConfig::default() @@ -46,7 +46,7 @@ fn get_store_generic( db_path: &TempDir, config: StoreConfig, spec: Arc, -) -> Arc, BeaconNodeBackend>> { +) -> Arc> { create_test_tracing_subscriber(); let hot_path = db_path.path().join("chain_db"); let cold_path = db_path.path().join("freezer_db"); @@ -64,7 +64,7 @@ fn get_store_generic( } fn get_harness( - store: Arc, BeaconNodeBackend>>, + store: Arc>, validator_count: usize, ) -> TestHarness { // Most tests expect to retain historic states, so we use this as the default. @@ -81,7 +81,7 @@ fn get_harness( } fn get_harness_generic( - store: Arc, BeaconNodeBackend>>, + store: Arc>, validator_count: usize, chain_config: ChainConfig, node_custody_type: NodeCustodyType, diff --git a/beacon_node/beacon_chain/tests/schema_stability.rs b/beacon_node/beacon_chain/tests/schema_stability.rs index 8200748ae6..899a40511d 100644 --- a/beacon_node/beacon_chain/tests/schema_stability.rs +++ b/beacon_node/beacon_chain/tests/schema_stability.rs @@ -20,7 +20,7 @@ use tempfile::{TempDir, tempdir}; use types::{ChainSpec, Hash256, MainnetEthSpec, Slot}; type E = MainnetEthSpec; -type Store = Arc, BeaconNodeBackend>>; +type Store = Arc>; type TestHarness = BeaconChainHarness>; const VALIDATOR_COUNT: usize = 32; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 0ff9f6841d..7e50f4e5ac 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -106,7 +106,7 @@ fn get_or_reconstruct_blobs( } } -fn get_store(db_path: &TempDir) -> Arc, BeaconNodeBackend>> { +fn get_store(db_path: &TempDir) -> Arc> { let store_config = StoreConfig { prune_payloads: false, ..StoreConfig::default() @@ -118,7 +118,7 @@ fn get_store_generic( db_path: &TempDir, config: StoreConfig, spec: ChainSpec, -) -> Arc, BeaconNodeBackend>> { +) -> Arc> { create_test_tracing_subscriber(); let hot_path = db_path.path().join("chain_db"); let cold_path = db_path.path().join("freezer_db"); @@ -136,7 +136,7 @@ fn get_store_generic( } fn get_harness( - store: Arc, BeaconNodeBackend>>, + store: Arc>, validator_count: usize, ) -> TestHarness { // Most tests expect to retain historic states, so we use this as the default. @@ -153,7 +153,7 @@ fn get_harness( } fn get_harness_import_all_data_columns( - store: Arc, BeaconNodeBackend>>, + store: Arc>, validator_count: usize, ) -> TestHarness { // Most tests expect to retain historic states, so we use this as the default. @@ -171,7 +171,7 @@ fn get_harness_import_all_data_columns( } fn get_harness_generic( - store: Arc, BeaconNodeBackend>>, + store: Arc>, validator_count: usize, chain_config: ChainConfig, node_custody_type: NodeCustodyType, @@ -205,7 +205,7 @@ fn check_db_invariants(harness: &TestHarness) { } fn get_states_descendant_of_block( - store: &HotColdDB, BeaconNodeBackend>, + store: &HotColdDB, block_root: Hash256, ) -> Vec<(Hash256, Slot)> { let summaries = store.load_hot_state_summaries().unwrap(); @@ -5859,7 +5859,7 @@ async fn test_gloas_hot_state_hierarchy() { /// Check that the HotColdDB's split_slot is equal to the start slot of the last finalized epoch. fn check_split_slot( harness: &TestHarness, - store: Arc, BeaconNodeBackend>>, + store: Arc>, ) { let split_slot = store.get_split_slot(); assert_eq!( diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index f532ef716e..0a3c414632 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -98,8 +98,8 @@ impl where TSlotClock: SlotClock + Clone + 'static, E: EthSpec + 'static, - THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Instantiates a new, empty builder. /// @@ -815,8 +815,8 @@ impl where TSlotClock: SlotClock + Clone + 'static, E: EthSpec + 'static, - THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Consumes the internal `BeaconChainBuilder`, attaching the resulting `BeaconChain` to self. #[instrument(skip_all)] @@ -847,8 +847,7 @@ where } } -impl - ClientBuilder, BeaconNodeBackend>> +impl ClientBuilder> where TSlotClock: SlotClock + 'static, E: EthSpec + 'static, @@ -889,8 +888,8 @@ where impl ClientBuilder> where E: EthSpec + 'static, - THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + THotStore: ItemStore + 'static, + TColdStore: ItemStore + 'static, { /// Specifies that the slot clock should read the time from the computers system clock. pub fn system_time_slot_clock(mut self) -> Result { diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index f27a04d17a..467a5216b1 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -57,7 +57,7 @@ pub struct ApiServer> { type HarnessBuilder = Builder>; type Initializer = Box) -> HarnessBuilder>; -type Mutator = BoxedMutator, MemoryStore>; +type Mutator = BoxedMutator; impl InteractiveTester { pub async fn new(spec: Option, validator_count: usize) -> Self { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7817feb0bd..434f7ecc8b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1267,8 +1267,7 @@ use { }; #[cfg(test)] -pub(crate) type TestBeaconChainType = - Witness, MemoryStore>; +pub(crate) type TestBeaconChainType = Witness; #[cfg(test)] impl NetworkBeaconProcessor> { diff --git a/beacon_node/network/src/persisted_dht.rs b/beacon_node/network/src/persisted_dht.rs index 113b3cdd32..3672f97113 100644 --- a/beacon_node/network/src/persisted_dht.rs +++ b/beacon_node/network/src/persisted_dht.rs @@ -6,7 +6,7 @@ use types::{EthSpec, Hash256}; /// 32-byte key for accessing the `DhtEnrs`. All zero because `DhtEnrs` has its own column. pub const DHT_DB_KEY: Hash256 = Hash256::ZERO; -pub fn load_dht, Cold: ItemStore>( +pub fn load_dht( store: Arc>, ) -> Vec { // Load DHT from store @@ -20,7 +20,7 @@ pub fn load_dht, Cold: ItemStore>( } /// Attempt to persist the ENR's in the DHT to `self.store`. -pub fn persist_dht, Cold: ItemStore>( +pub fn persist_dht( store: Arc>, enrs: Vec, ) -> Result<(), store::Error> { @@ -28,7 +28,7 @@ pub fn persist_dht, Cold: ItemStore>( } /// Attempts to clear any DHT entries. -pub fn clear_dht, Cold: ItemStore>( +pub fn clear_dht( store: Arc>, ) -> Result<(), store::Error> { store.hot_db.delete::(&DHT_DB_KEY) @@ -75,11 +75,8 @@ mod tests { use types::{ChainSpec, MinimalEthSpec}; #[test] fn test_persisted_dht() { - let store: HotColdDB< - MinimalEthSpec, - MemoryStore, - MemoryStore, - > = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal().into()).unwrap(); + let store: HotColdDB = + HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal().into()).unwrap(); let enrs = vec![Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap()]; store .put_item(&DHT_DB_KEY, &PersistedDht { enrs: enrs.clone() }) diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index 619154d738..745934053a 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -25,12 +25,7 @@ const SLOT_DURATION_MILLIS: u64 = 400; const TEST_LOG_LEVEL: Option<&str> = None; -type TestBeaconChainType = Witness< - SystemTimeSlotClock, - MainnetEthSpec, - MemoryStore, - MemoryStore, ->; +type TestBeaconChainType = Witness; pub struct TestBeaconChain { chain: Arc>, diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index dd8c3ae432..4e185cc081 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -26,7 +26,7 @@ use types::{ForkName, Hash256, MinimalEthSpec as E, Slot}; mod lookups; mod range; -type T = Witness, MemoryStore>; +type T = Witness; /// This test utility enables integration testing of Lighthouse sync components. /// diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index e33da17e26..6400427f8c 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -20,7 +20,7 @@ use types::{ChainSpec, Epoch, EthSpec, ForkName}; /// A type-alias to the tighten the definition of a production-intended `Client`. pub type ProductionClient = - Client, BeaconNodeBackend>>; + Client>; /// The beacon node `Client` that is used in production. /// diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index 5646f1179c..7e0a09a3e9 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -6,18 +6,17 @@ use crate::{ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValue use crate::{KeyValueStoreOp, StoreConfig, config::DatabaseBackend}; use std::collections::HashSet; use std::path::Path; -use types::EthSpec; -pub enum BeaconNodeBackend { +pub enum BeaconNodeBackend { #[cfg(feature = "leveldb")] - LevelDb(leveldb_impl::LevelDB), + LevelDb(leveldb_impl::LevelDB), #[cfg(feature = "redb")] - Redb(redb_impl::Redb), + Redb(redb_impl::Redb), } -impl ItemStore for BeaconNodeBackend {} +impl ItemStore for BeaconNodeBackend {} -impl KeyValueStore for BeaconNodeBackend { +impl KeyValueStore for BeaconNodeBackend { fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error> { match self { #[cfg(feature = "leveldb")] @@ -183,7 +182,7 @@ impl KeyValueStore for BeaconNodeBackend { } } -impl BeaconNodeBackend { +impl BeaconNodeBackend { pub fn open(config: &StoreConfig, path: &Path) -> Result { metrics::inc_counter_vec(&metrics::DISK_DB_TYPE, &[&config.backend.to_string()]); match config.backend { diff --git a/beacon_node/store/src/database/leveldb_impl.rs b/beacon_node/store/src/database/leveldb_impl.rs index 6e01648263..0531eb900e 100644 --- a/beacon_node/store/src/database/leveldb_impl.rs +++ b/beacon_node/store/src/database/leveldb_impl.rs @@ -15,15 +15,13 @@ use leveldb::{ options::{Options, ReadOptions}, }; use std::collections::HashSet; -use std::marker::PhantomData; use std::path::Path; -use types::{EthSpec, Hash256}; +use types::Hash256; use super::interface::WriteOptions; -pub struct LevelDB { +pub struct LevelDB { db: Database, - _phantom: PhantomData, } impl From for leveldb::options::WriteOptions { @@ -34,7 +32,7 @@ impl From for leveldb::options::WriteOptions { } } -impl LevelDB { +impl LevelDB { pub fn open(path: &Path) -> Result { let mut options = Options::new(); @@ -42,10 +40,7 @@ impl LevelDB { let db = Database::open(path, options)?; - Ok(Self { - db, - _phantom: PhantomData, - }) + Ok(Self { db }) } pub fn read_options(&self) -> ReadOptions<'_, BytesKey> { diff --git a/beacon_node/store/src/database/redb_impl.rs b/beacon_node/store/src/database/redb_impl.rs index 4077326eca..dc39f22114 100644 --- a/beacon_node/store/src/database/redb_impl.rs +++ b/beacon_node/store/src/database/redb_impl.rs @@ -3,17 +3,15 @@ use crate::{DBColumn, Error, KeyValueStoreOp}; use parking_lot::RwLock; use redb::TableDefinition; use std::collections::HashSet; -use std::{borrow::BorrowMut, marker::PhantomData, path::Path}; +use std::{borrow::BorrowMut, path::Path}; use strum::IntoEnumIterator; -use types::EthSpec; use super::interface::WriteOptions; pub const DB_FILE_NAME: &str = "database.redb"; -pub struct Redb { +pub struct Redb { db: RwLock, - _phantom: PhantomData, } impl From for redb::Durability { @@ -26,19 +24,16 @@ impl From for redb::Durability { } } -impl Redb { +impl Redb { pub fn open(path: &Path) -> Result { let db_file = path.join(DB_FILE_NAME); let db = redb::Database::create(db_file)?; for column in DBColumn::iter() { - Redb::::create_table(&db, column.into())?; + Self::create_table(&db, column.into())?; } - Ok(Self { - db: db.into(), - _phantom: PhantomData, - }) + Ok(Self { db: db.into() }) } fn create_table(db: &redb::Database, table_name: &str) -> Result<(), Error> { diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index 255b7d8eac..ef4312f506 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -9,7 +9,7 @@ pub type HybridForwardsBlockRootsIterator<'a, E, Hot, Cold> = pub type HybridForwardsStateRootsIterator<'a, E, Hot, Cold> = HybridForwardsIterator<'a, E, Hot, Cold>; -impl, Cold: ItemStore> HotColdDB { +impl HotColdDB { fn simple_forwards_iterator( &self, column: DBColumn, @@ -116,7 +116,7 @@ impl, Cold: ItemStore> HotColdDB } /// Forwards root iterator that makes use of a slot -> root mapping in the freezer DB. -pub struct FrozenForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct FrozenForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { inner: ColumnIter<'a, Vec>, column: DBColumn, next_slot: Slot, @@ -124,9 +124,7 @@ pub struct FrozenForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemS _phantom: PhantomData<(E, Hot, Cold)>, } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> - FrozenForwardsIterator<'a, E, Hot, Cold> -{ +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> FrozenForwardsIterator<'a, E, Hot, Cold> { /// `end_slot` is EXCLUSIVE here. pub fn new( store: &'a HotColdDB, @@ -148,7 +146,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -impl, Cold: ItemStore> Iterator +impl Iterator for FrozenForwardsIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, Slot)>; @@ -199,7 +197,7 @@ impl Iterator for SimpleForwardsIterator { } /// Fusion of the above two approaches to forwards iteration. Fast and efficient. -pub enum HybridForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub enum HybridForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { PreFinalization { iter: Box>, store: &'a HotColdDB, @@ -220,9 +218,7 @@ pub enum HybridForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemSto Finished, } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> - HybridForwardsIterator<'a, E, Hot, Cold> -{ +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> HybridForwardsIterator<'a, E, Hot, Cold> { /// Construct a new hybrid iterator. /// /// The `get_state` closure should return a beacon state and final block/state root to backtrack @@ -349,7 +345,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -impl, Cold: ItemStore> Iterator +impl Iterator for HybridForwardsIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, Slot)>; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e9b9de76e6..a625a97004 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -49,7 +49,7 @@ use zstd::{Decoder, Encoder}; /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores /// intermittent "restore point" states pre-finalization. #[derive(Debug)] -pub struct HotColdDB, Cold: ItemStore> { +pub struct HotColdDB { /// The slot and state root at the point where the database is split between hot and cold. /// /// States with slots less than `split.slot` are in the cold DB, while states with slots @@ -217,11 +217,11 @@ pub enum HotColdDBError { Rollback, } -impl HotColdDB, MemoryStore> { +impl HotColdDB { pub fn open_ephemeral( config: StoreConfig, spec: Arc, - ) -> Result, MemoryStore>, Error> { + ) -> Result, Error> { config.verify::()?; let hierarchy = config.hierarchy_config.to_moduli()?; @@ -258,7 +258,7 @@ impl HotColdDB, MemoryStore> { } } -impl HotColdDB, BeaconNodeBackend> { +impl HotColdDB { /// Open a new or existing database, with the given paths to the hot and cold DBs. /// /// The `migrate_schema` function is passed in so that the parent `BeaconChain` can provide @@ -451,7 +451,7 @@ impl HotColdDB, BeaconNodeBackend> { } } -impl, Cold: ItemStore> HotColdDB { +impl HotColdDB { fn cold_storage_strategy(&self, slot: Slot) -> Result { // The start slot for the freezer HDiff is always 0 Ok(self.hierarchy.storage_strategy(slot, Slot::new(0))?) @@ -3575,7 +3575,7 @@ impl, Cold: ItemStore> HotColdDB /// This function previously did a combination of freezer migration alongside pruning. Now it is /// *just* responsible for copying relevant data to the freezer, while pruning is implemented /// in `prune_hot_db`. -pub fn migrate_database, Cold: ItemStore>( +pub fn migrate_database( store: Arc>, finalized_state_root: Hash256, finalized_block_root: Hash256, @@ -3786,7 +3786,7 @@ pub enum StateSummaryIteratorError { /// Return the ancestor state root of a state beyond SlotsPerHistoricalRoot using the roots iterator /// and the store -pub fn get_ancestor_state_root<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore>( +pub fn get_ancestor_state_root<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore>( store: &'a HotColdDB, from_state: &'a BeaconState, target_slot: Slot, @@ -3993,7 +3993,7 @@ impl StoreItem for HotStateSummary { impl HotStateSummary { /// Construct a new summary of the given state. - pub fn new, Cold: ItemStore>( + pub fn new( store: &HotColdDB, state_root: Hash256, state: &BeaconState, diff --git a/beacon_node/store/src/invariants.rs b/beacon_node/store/src/invariants.rs index d251fb8800..4ec72b82bd 100644 --- a/beacon_node/store/src/invariants.rs +++ b/beacon_node/store/src/invariants.rs @@ -242,7 +242,7 @@ pub enum InvariantViolation { ColdStateBaseSummaryMissing { slot: Slot, base_slot: Slot }, } -impl, Cold: ItemStore> HotColdDB { +impl HotColdDB { /// Run all database invariant checks. /// /// The `ctx` parameter provides data from the beacon chain layer (fork choice, state cache, diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 0cb803d1ed..cf1ab86ffe 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -13,12 +13,12 @@ use types::{ /// /// It is assumed that all ancestors for this object are stored in the database. If this is not the /// case, the iterator will start returning `None` prior to genesis. -pub trait AncestorIter<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore, I: Iterator> { +pub trait AncestorIter<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore, I: Iterator> { /// Returns an iterator over the roots of the ancestors of `self`. fn try_iter_ancestor_roots(&self, store: &'a HotColdDB) -> Option; } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> AncestorIter<'a, E, Hot, Cold, BlockRootsIterator<'a, E, Hot, Cold>> for SignedBeaconBlock { /// Iterates across all available prior block roots of `self`, starting at the most recent and ending @@ -37,7 +37,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> AncestorIter<'a, E, Hot, Cold, StateRootsIterator<'a, E, Hot, Cold>> for BeaconState { /// Iterates across all available prior state roots of `self`, starting at the most recent and ending @@ -51,13 +51,11 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -pub struct StateRootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct StateRootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { inner: RootsIterator<'a, E, Hot, Cold>, } -impl, Cold: ItemStore> Clone - for StateRootsIterator<'_, E, Hot, Cold> -{ +impl Clone for StateRootsIterator<'_, E, Hot, Cold> { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -65,7 +63,7 @@ impl, Cold: ItemStore> Clone } } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> StateRootsIterator<'a, E, Hot, Cold> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> StateRootsIterator<'a, E, Hot, Cold> { pub fn new(store: &'a HotColdDB, beacon_state: &'a BeaconState) -> Self { Self { inner: RootsIterator::new(store, beacon_state), @@ -79,7 +77,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> StateRootsIterator<' } } -impl, Cold: ItemStore> Iterator +impl Iterator for StateRootsIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, Slot), Error>; @@ -99,13 +97,11 @@ impl, Cold: ItemStore> Iterator /// exhausted. /// /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. -pub struct BlockRootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct BlockRootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { inner: RootsIterator<'a, E, Hot, Cold>, } -impl, Cold: ItemStore> Clone - for BlockRootsIterator<'_, E, Hot, Cold> -{ +impl Clone for BlockRootsIterator<'_, E, Hot, Cold> { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -113,7 +109,7 @@ impl, Cold: ItemStore> Clone } } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockRootsIterator<'a, E, Hot, Cold> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockRootsIterator<'a, E, Hot, Cold> { /// Create a new iterator over all block roots in the given `beacon_state` and prior states. pub fn new(store: &'a HotColdDB, beacon_state: &'a BeaconState) -> Self { Self { @@ -138,7 +134,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockRootsIterator<' } } -impl, Cold: ItemStore> Iterator +impl Iterator for BlockRootsIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, Slot), Error>; @@ -151,13 +147,13 @@ impl, Cold: ItemStore> Iterator } /// Iterator over state and block roots that backtracks using the vectors from a `BeaconState`. -pub struct RootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct RootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { store: &'a HotColdDB, beacon_state: Cow<'a, BeaconState>, slot: Slot, } -impl, Cold: ItemStore> Clone for RootsIterator<'_, E, Hot, Cold> { +impl Clone for RootsIterator<'_, E, Hot, Cold> { fn clone(&self) -> Self { Self { store: self.store, @@ -167,7 +163,7 @@ impl, Cold: ItemStore> Clone for RootsIterator< } } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, E, Hot, Cold> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, E, Hot, Cold> { pub fn new(store: &'a HotColdDB, beacon_state: &'a BeaconState) -> Self { Self { store, @@ -234,9 +230,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, E, } } -impl, Cold: ItemStore> Iterator - for RootsIterator<'_, E, Hot, Cold> -{ +impl Iterator for RootsIterator<'_, E, Hot, Cold> { /// (block_root, state_root, slot) type Item = Result<(Hash256, Hash256, Slot), Error>; @@ -246,15 +240,13 @@ impl, Cold: ItemStore> Iterator } /// Block iterator that uses the `parent_root` of each block to backtrack. -pub struct ParentRootBlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct ParentRootBlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { store: &'a HotColdDB, next_block_root: Hash256, _phantom: PhantomData, } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> - ParentRootBlockIterator<'a, E, Hot, Cold> -{ +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> ParentRootBlockIterator<'a, E, Hot, Cold> { pub fn new(store: &'a HotColdDB, start_block_root: Hash256) -> Self { Self { store, @@ -283,7 +275,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -impl, Cold: ItemStore> Iterator +impl Iterator for ParentRootBlockIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, SignedBeaconBlock>), Error>; @@ -295,11 +287,11 @@ impl, Cold: ItemStore> Iterator #[derive(Clone)] /// Extends `BlockRootsIterator`, returning `SignedBeaconBlock` instances, instead of their roots. -pub struct BlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct BlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { roots: BlockRootsIterator<'a, E, Hot, Cold>, } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, E, Hot, Cold> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, E, Hot, Cold> { /// Create a new iterator over all blocks in the given `beacon_state` and prior states. pub fn new(store: &'a HotColdDB, beacon_state: &'a BeaconState) -> Self { Self { @@ -324,9 +316,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, E, } } -impl, Cold: ItemStore> Iterator - for BlockIterator<'_, E, Hot, Cold> -{ +impl Iterator for BlockIterator<'_, E, Hot, Cold> { type Item = Result>, Error>; fn next(&mut self) -> Option { @@ -338,7 +328,7 @@ impl, Cold: ItemStore> Iterator /// /// Return `Err(HistoryUnavailable)` in the case where no more backtrack states are available /// due to weak subjectivity sync. -fn next_historical_root_backtrack_state, Cold: ItemStore>( +fn next_historical_root_backtrack_state( store: &HotColdDB, current_state: &BeaconState, ) -> Result, Error> { @@ -386,7 +376,7 @@ mod test { harness.get_current_state() } - fn get_store() -> HotColdDB, MemoryStore> { + fn get_store() -> HotColdDB { let store = HotColdDB::open_ephemeral(Config::default(), Arc::new(E::default_spec())).unwrap(); // Init achor info so anchor slot is set. Use a random block as it is only used for the diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index bd8caa3ad5..56cdd18fbe 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -46,7 +46,7 @@ pub type ColumnKeyIter<'a, K> = Box> + 'a>; pub type RawEntryIter<'a> = Result, Vec), Error>> + 'a>, Error>; -pub trait KeyValueStore: Sync + Send + Sized + 'static { +pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Retrieve some bytes in `column` with `key`. fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error>; @@ -177,7 +177,7 @@ pub enum KeyValueStoreOp { DeleteKey(DBColumn, Vec), } -pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'static { +pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'static { /// Store an item in `Self`. fn put(&self, key: &Hash256, item: &I) -> Result<(), Error> { let column = I::db_column(); @@ -493,7 +493,7 @@ mod tests { } } - fn test_impl(store: impl ItemStore) { + fn test_impl(store: impl ItemStore) { let key = Hash256::random(); let item = StorableThing { a: 1, b: 42 }; @@ -531,7 +531,7 @@ mod tests { #[test] fn exists() { - let store = MemoryStore::::open(); + let store = MemoryStore::open(); let key = Hash256::random(); let item = StorableThing { a: 1, b: 42 }; diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 6baef61c9d..8be9278d90 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -4,28 +4,24 @@ use crate::{ }; use parking_lot::RwLock; use std::collections::{BTreeMap, HashSet}; -use std::marker::PhantomData; -use types::*; type DBMap = BTreeMap>; /// A thread-safe `BTreeMap` wrapper. -pub struct MemoryStore { +pub struct MemoryStore { db: RwLock, - _phantom: PhantomData, } -impl MemoryStore { +impl MemoryStore { /// Create a new, empty database. pub fn open() -> Self { Self { db: RwLock::new(BTreeMap::new()), - _phantom: PhantomData, } } } -impl KeyValueStore for MemoryStore { +impl KeyValueStore for MemoryStore { /// Get the value of some key from the database. Returns `None` if the key does not exist. fn get_bytes(&self, col: DBColumn, key: &[u8]) -> Result>, Error> { let column_key = BytesKey::from_vec(get_key_for_col(col, key)); @@ -148,4 +144,4 @@ impl KeyValueStore for MemoryStore { } } -impl ItemStore for MemoryStore {} +impl ItemStore for MemoryStore {} diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index 04a519af02..2fb40daa0d 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -15,8 +15,8 @@ use types::{EthSpec, Slot}; impl HotColdDB where E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, + Hot: ItemStore, + Cold: ItemStore, { pub fn reconstruct_historic_states( self: &Arc, diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 353893026b..848834b4d8 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -79,7 +79,7 @@ impl ForkChoiceTest { /// Get a value from the `ForkChoice` instantiation. fn get(&self, func: T) -> U where - T: Fn(&BeaconForkChoiceStore, MemoryStore>) -> U, + T: Fn(&BeaconForkChoiceStore) -> U, { func( self.harness diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 608400fa7e..2509b500e0 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -55,7 +55,7 @@ pub fn display_db_version( let blobs_path = client_config.get_blobs_db_path(); let mut version = CURRENT_SCHEMA_VERSION; - HotColdDB::, BeaconNodeBackend>::open( + HotColdDB::::open( &hot_path, &cold_path, &blobs_path, @@ -143,13 +143,13 @@ pub fn inspect_db( let mut num_keys = 0; let sub_db = if inspect_config.freezer { - BeaconNodeBackend::::open(&client_config.store, &cold_path) + BeaconNodeBackend::open(&client_config.store, &cold_path) .map_err(|e| format!("Unable to open freezer DB: {e:?}"))? } else if inspect_config.blobs_db { - BeaconNodeBackend::::open(&client_config.store, &blobs_path) + BeaconNodeBackend::open(&client_config.store, &blobs_path) .map_err(|e| format!("Unable to open blobs DB: {e:?}"))? } else { - BeaconNodeBackend::::open(&client_config.store, &hot_path) + BeaconNodeBackend::open(&client_config.store, &hot_path) .map_err(|e| format!("Unable to open hot DB: {e:?}"))? }; @@ -264,17 +264,17 @@ pub fn compact_db( let (sub_db, db_name) = if compact_config.freezer { ( - BeaconNodeBackend::::open(&client_config.store, &cold_path)?, + BeaconNodeBackend::open(&client_config.store, &cold_path)?, "freezer_db", ) } else if compact_config.blobs_db { ( - BeaconNodeBackend::::open(&client_config.store, &blobs_path)?, + BeaconNodeBackend::open(&client_config.store, &blobs_path)?, "blobs_db", ) } else { ( - BeaconNodeBackend::::open(&client_config.store, &hot_path)?, + BeaconNodeBackend::open(&client_config.store, &hot_path)?, "hot_db", ) }; @@ -309,7 +309,7 @@ pub fn migrate_db( let mut from = CURRENT_SCHEMA_VERSION; let to = migrate_config.to; - let db = HotColdDB::, BeaconNodeBackend>::open( + let db = HotColdDB::::open( &hot_path, &cold_path, &blobs_path, @@ -339,7 +339,7 @@ pub fn prune_payloads( let cold_path = client_config.get_freezer_db_path(); let blobs_path = client_config.get_blobs_db_path(); - let db = HotColdDB::, BeaconNodeBackend>::open( + let db = HotColdDB::::open( &hot_path, &cold_path, &blobs_path, @@ -363,7 +363,7 @@ pub fn prune_blobs( let cold_path = client_config.get_freezer_db_path(); let blobs_path = client_config.get_blobs_db_path(); - let db = HotColdDB::, BeaconNodeBackend>::open( + let db = HotColdDB::::open( &hot_path, &cold_path, &blobs_path, @@ -398,7 +398,7 @@ pub fn prune_states( let cold_path = client_config.get_freezer_db_path(); let blobs_path = client_config.get_blobs_db_path(); - let db = HotColdDB::, BeaconNodeBackend>::open( + let db = HotColdDB::::open( &hot_path, &cold_path, &blobs_path,