diff --git a/Cargo.lock b/Cargo.lock index b11b585173..4bf6c7cb0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1669,9 +1669,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.14" +version = "0.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" dependencies = [ "crossbeam-utils", ] @@ -2545,6 +2545,7 @@ dependencies = [ "multiaddr", "pretty_reqwest_error", "proto_array", + "rand 0.8.5", "reqwest", "reqwest-eventsource", "sensitive_url", @@ -2552,6 +2553,7 @@ dependencies = [ "serde_json", "slashing_protection", "ssz_types", + "test_random_derive", "tokio", "types", "zeroize", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4d8e94f86d..64ef5ef17e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4007,7 +4007,7 @@ impl BeaconChain { &mut state, ) .unwrap_or_else(|e| { - error!("error caching light_client data {:?}", e); + debug!("error caching light_client data {:?}", e); }); } @@ -7135,6 +7135,31 @@ impl BeaconChain { } } } + + /// Retrieves block roots (in ascending slot order) within some slot range from fork choice. + pub fn block_roots_from_fork_choice(&self, start_slot: u64, count: u64) -> Vec { + let head_block_root = self.canonical_head.cached_head().head_block_root(); + let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock(); + let block_roots_iter = fork_choice_read_lock + .proto_array() + .iter_block_roots(&head_block_root); + let end_slot = start_slot.saturating_add(count); + let mut roots = vec![]; + + for (root, slot) in block_roots_iter { + if slot < end_slot && slot >= start_slot { + roots.push(root); + } + if slot < start_slot { + break; + } + } + + drop(fork_choice_read_lock); + // return in ascending slot order + roots.reverse(); + roots + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 43429b726c..8a79bff4c7 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -362,6 +362,12 @@ pub struct DummyEth1ChainBackend(PhantomData); impl Eth1ChainBackend for DummyEth1ChainBackend { /// Produce some deterministic junk based upon the current epoch. fn eth1_data(&self, state: &BeaconState, _spec: &ChainSpec) -> Result { + // [New in Electra:EIP6110] + if let Ok(deposit_requests_start_index) = state.deposit_requests_start_index() { + if state.eth1_deposit_index() == deposit_requests_start_index { + return Ok(state.eth1_data().clone()); + } + } let current_epoch = state.current_epoch(); let slots_per_voting_period = E::slots_per_eth1_voting_period() as u64; let current_voting_period: u64 = current_epoch.as_u64() / slots_per_voting_period; @@ -456,6 +462,12 @@ impl CachingEth1Backend { impl Eth1ChainBackend for CachingEth1Backend { fn eth1_data(&self, state: &BeaconState, spec: &ChainSpec) -> Result { + // [New in Electra:EIP6110] + if let Ok(deposit_requests_start_index) = state.deposit_requests_start_index() { + if state.eth1_deposit_index() == deposit_requests_start_index { + return Ok(state.eth1_data().clone()); + } + } let period = E::SlotsPerEth1VotingPeriod::to_u64(); let voting_period_start_slot = (state.slot() / period) * period; let voting_period_start_seconds = slot_start_seconds( diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index ee326f22cd..bbdf1a054b 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -210,6 +210,7 @@ pub enum BlockProposalContents> { /// `None` for blinded `PayloadAndBlobs`. blobs_and_proofs: Option<(BlobsList, KzgProofs)>, // TODO(electra): this should probably be a separate variant/superstruct + // See: https://github.com/sigp/lighthouse/issues/6981 requests: Option>, }, } diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index b13517f27e..afc68ad96d 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -28,6 +28,7 @@ metrics = { workspace = true } network = { workspace = true } operation_pool = { workspace = true } parking_lot = { workspace = true } +proto_array = { workspace = true } rand = { workspace = true } safe_arith = { workspace = true } sensitive_url = { workspace = true } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index eb188a9b19..412b756684 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -68,6 +68,7 @@ use serde_json::Value; use slot_clock::SlotClock; use ssz::Encode; pub use state_id::StateId; +use std::collections::HashSet; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; @@ -86,13 +87,14 @@ use tokio_stream::{ StreamExt, }; use tracing::{debug, error, info, warn}; +use types::AttestationData; use types::{ - fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, - AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, - Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, - ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, - SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, + fork_versioned_response::EmptyMetadata, Attestation, AttestationShufflingId, AttesterSlashing, + BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, + ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, + RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, + SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -1145,6 +1147,39 @@ pub fn serve( }, ); + // GET beacon/states/{state_id}/pending_consolidations + let get_beacon_state_pending_consolidations = beacon_states_path + .clone() + .and(warp::path("pending_consolidations")) + .and(warp::path::end()) + .then( + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (data, execution_optimistic, finalized) = state_id + .map_state_and_execution_optimistic_and_finalized( + &chain, + |state, execution_optimistic, finalized| { + let Ok(consolidations) = state.pending_consolidations() else { + return Err(warp_utils::reject::custom_bad_request( + "Pending consolidations not found".to_string(), + )); + }; + + Ok((consolidations.clone(), execution_optimistic, finalized)) + }, + )?; + + Ok(api_types::ExecutionOptimisticFinalizedResponse { + data, + execution_optimistic: Some(execution_optimistic), + finalized: Some(finalized), + }) + }) + }, + ); + // GET beacon/headers // // Note: this endpoint only returns information about blocks in the canonical chain. Given that @@ -1927,11 +1962,11 @@ pub fn serve( chain: Arc>, query: api_types::AttestationPoolQuery| { task_spawner.blocking_response_task(Priority::P1, move || { - let query_filter = |data: &AttestationData| { + let query_filter = |data: &AttestationData, committee_indices: HashSet| { query.slot.is_none_or(|slot| slot == data.slot) && query .committee_index - .is_none_or(|index| index == data.index) + .is_none_or(|index| committee_indices.contains(&index)) }; let mut attestations = chain.op_pool.get_filtered_attestations(query_filter); @@ -1940,7 +1975,9 @@ pub fn serve( .naive_aggregation_pool .read() .iter() - .filter(|&att| query_filter(att.data())) + .filter(|&att| { + query_filter(att.data(), att.get_committee_indices_map()) + }) .cloned(), ); // Use the current slot to find the fork version, and convert all messages to the @@ -4737,6 +4774,7 @@ pub fn serve( .uor(get_beacon_state_randao) .uor(get_beacon_state_pending_deposits) .uor(get_beacon_state_pending_partial_withdrawals) + .uor(get_beacon_state_pending_consolidations) .uor(get_beacon_headers) .uor(get_beacon_headers_block_id) .uor(get_beacon_block) diff --git a/beacon_node/http_api/src/light_client.rs b/beacon_node/http_api/src/light_client.rs index ac8c08581c..2d0a5d09a1 100644 --- a/beacon_node/http_api/src/light_client.rs +++ b/beacon_node/http_api/src/light_client.rs @@ -4,7 +4,7 @@ use crate::version::{ use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::types::{ self as api_types, ChainSpec, ForkVersionedResponse, LightClientUpdate, - LightClientUpdateResponseChunk, LightClientUpdateSszResponse, LightClientUpdatesQuery, + LightClientUpdateResponseChunk, LightClientUpdateResponseChunkInner, LightClientUpdatesQuery, }; use ssz::Encode; use std::sync::Arc; @@ -37,15 +37,9 @@ pub fn get_light_client_updates( .map(|update| map_light_client_update_to_ssz_chunk::(&chain, update)) .collect::>(); - let ssz_response = LightClientUpdateSszResponse { - response_chunk_len: (light_client_updates.len() as u64).to_le_bytes().to_vec(), - response_chunk: response_chunks.as_ssz_bytes(), - } - .as_ssz_bytes(); - Response::builder() .status(200) - .body(ssz_response) + .body(response_chunks.as_ssz_bytes()) .map(|res: Response>| add_ssz_content_type_header(res)) .map_err(|e| { warp_utils::reject::custom_server_error(format!( @@ -159,16 +153,24 @@ fn map_light_client_update_to_ssz_chunk( ) -> LightClientUpdateResponseChunk { let fork_name = chain .spec - .fork_name_at_slot::(*light_client_update.signature_slot()); + .fork_name_at_slot::(light_client_update.attested_header_slot()); let fork_digest = ChainSpec::compute_fork_digest( chain.spec.fork_version_for_name(fork_name), chain.genesis_validators_root, ); - LightClientUpdateResponseChunk { + let payload = light_client_update.as_ssz_bytes(); + let response_chunk_len = fork_digest.len() + payload.len(); + + let response_chunk = LightClientUpdateResponseChunkInner { context: fork_digest, - payload: light_client_update.as_ssz_bytes(), + payload, + }; + + LightClientUpdateResponseChunk { + response_chunk_len: response_chunk_len as u64, + response_chunk, } } diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 6d407d2742..31fecfeb99 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -27,6 +27,7 @@ use http_api::{ }; use lighthouse_network::{types::SyncState, Enr, EnrExt, PeerId}; use network::NetworkReceivers; +use operation_pool::attestation_storage::CheckpointKey; use proto_array::ExecutionStatus; use sensitive_url::SensitiveUrl; use slot_clock::SlotClock; @@ -1241,6 +1242,33 @@ impl ApiTester { self } + pub async fn test_beacon_states_pending_consolidations(self) -> Self { + for state_id in self.interesting_state_ids() { + let mut state_opt = state_id + .state(&self.chain) + .ok() + .map(|(state, _execution_optimistic, _finalized)| state); + + let result = self + .client + .get_beacon_states_pending_consolidations(state_id.0) + .await + .unwrap() + .map(|res| res.data); + + if result.is_none() && state_opt.is_none() { + continue; + } + + let state = state_opt.as_mut().expect("result should be none"); + let expected = state.pending_consolidations().unwrap(); + + assert_eq!(result.unwrap(), expected.to_vec()); + } + + self + } + pub async fn test_beacon_headers_all_slots(self) -> Self { for slot in 0..CHAIN_LENGTH { let slot = Slot::from(slot); @@ -2087,7 +2115,7 @@ impl ApiTester { self } - pub async fn test_get_beacon_pool_attestations(self) -> Self { + pub async fn test_get_beacon_pool_attestations(self) { let result = self .client .get_beacon_pool_attestations_v1(None, None) @@ -2106,9 +2134,80 @@ impl ApiTester { .await .unwrap() .data; + assert_eq!(result, expected); - self + let result_committee_index_filtered = self + .client + .get_beacon_pool_attestations_v1(None, Some(0)) + .await + .unwrap() + .data; + + let expected_committee_index_filtered = expected + .clone() + .into_iter() + .filter(|att| att.get_committee_indices_map().contains(&0)) + .collect::>(); + + assert_eq!( + result_committee_index_filtered, + expected_committee_index_filtered + ); + + let result_committee_index_filtered = self + .client + .get_beacon_pool_attestations_v1(None, Some(1)) + .await + .unwrap() + .data; + + let expected_committee_index_filtered = expected + .clone() + .into_iter() + .filter(|att| att.get_committee_indices_map().contains(&1)) + .collect::>(); + + assert_eq!( + result_committee_index_filtered, + expected_committee_index_filtered + ); + + let fork_name = self + .harness + .chain + .spec + .fork_name_at_slot::(self.harness.chain.slot().unwrap()); + + // aggregate electra attestations + if fork_name.electra_enabled() { + // Take and drop the lock in a block to avoid clippy complaining + // about taking locks across await points + { + let mut all_attestations = self.chain.op_pool.attestations.write(); + let (prev_epoch_key, curr_epoch_key) = + CheckpointKey::keys_for_state(&self.harness.get_current_state()); + all_attestations.aggregate_across_committees(prev_epoch_key); + all_attestations.aggregate_across_committees(curr_epoch_key); + } + let result_committee_index_filtered = self + .client + .get_beacon_pool_attestations_v2(None, Some(0)) + .await + .unwrap() + .data; + let mut expected = self.chain.op_pool.get_all_attestations(); + expected.extend(self.chain.naive_aggregation_pool.read().iter().cloned()); + let expected_committee_index_filtered = expected + .clone() + .into_iter() + .filter(|att| att.get_committee_indices_map().contains(&0)) + .collect::>(); + assert_eq!( + result_committee_index_filtered, + expected_committee_index_filtered + ); + } } pub async fn test_post_beacon_pool_attester_slashings_valid_v1(mut self) -> Self { @@ -6386,6 +6485,8 @@ async fn beacon_get_state_info_electra() { .test_beacon_states_pending_deposits() .await .test_beacon_states_pending_partial_withdrawals() + .await + .test_beacon_states_pending_consolidations() .await; } @@ -6416,10 +6517,30 @@ async fn beacon_get_blocks() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_get_pools() { +async fn test_beacon_pool_attestations_electra() { + let mut config = ApiTesterConfig::default(); + config.spec.altair_fork_epoch = Some(Epoch::new(0)); + config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + config.spec.capella_fork_epoch = Some(Epoch::new(0)); + config.spec.deneb_fork_epoch = Some(Epoch::new(0)); + config.spec.electra_fork_epoch = Some(Epoch::new(0)); + ApiTester::new_from_config(config) + .await + .test_get_beacon_pool_attestations() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_beacon_pool_attestations_base() { ApiTester::new() .await .test_get_beacon_pool_attestations() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn beacon_get_pools() { + ApiTester::new() .await .test_get_beacon_pool_attester_slashings() .await diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 5a6628439e..89d260569a 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -14,7 +14,7 @@ use std::num::NonZeroU16; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use types::{ForkContext, ForkName}; +use types::ForkContext; pub const DEFAULT_IPV4_ADDRESS: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const DEFAULT_TCP_PORT: u16 = 9000u16; @@ -22,18 +22,9 @@ pub const DEFAULT_DISC_PORT: u16 = 9000u16; pub const DEFAULT_QUIC_PORT: u16 = 9001u16; pub const DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD: usize = 1000usize; -/// The maximum size of gossip messages. -pub fn gossip_max_size(is_merge_enabled: bool, gossip_max_size: usize) -> usize { - if is_merge_enabled { - gossip_max_size - } else { - gossip_max_size / 10 - } -} - pub struct GossipsubConfigParams { pub message_domain_valid_snappy: [u8; 4], - pub gossip_max_size: usize, + pub gossipsub_max_transmit_size: usize, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -480,7 +471,6 @@ pub fn gossipsub_config( } } let message_domain_valid_snappy = gossipsub_config_params.message_domain_valid_snappy; - let is_bellatrix_enabled = fork_context.fork_exists(ForkName::Bellatrix); let gossip_message_id = move |message: &gossipsub::Message| { gossipsub::MessageId::from( &Sha256::digest( @@ -499,10 +489,7 @@ pub fn gossipsub_config( let duplicate_cache_time = Duration::from_secs(slots_per_epoch * seconds_per_slot * 2); gossipsub::ConfigBuilder::default() - .max_transmit_size(gossip_max_size( - is_bellatrix_enabled, - gossipsub_config_params.gossip_max_size, - )) + .max_transmit_size(gossipsub_config_params.gossipsub_max_transmit_size) .heartbeat_interval(load.heartbeat_interval) .mesh_n(load.mesh_n) .mesh_n_low(load.mesh_n_low) diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index dbeb0c2c2b..40fdd71b38 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -12,7 +12,6 @@ pub mod peer_manager; pub mod rpc; pub mod types; -pub use config::gossip_max_size; use libp2p::swarm::DialError; pub use listen_addr::*; diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index a6c8efd6a6..2612172e61 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -1002,10 +1002,7 @@ mod tests { } /// Bellatrix block with length < max_rpc_size. - fn bellatrix_block_small( - fork_context: &ForkContext, - spec: &ChainSpec, - ) -> SignedBeaconBlock { + fn bellatrix_block_small(spec: &ChainSpec) -> SignedBeaconBlock { let mut block: BeaconBlockBellatrix<_, FullPayload> = BeaconBlockBellatrix::empty(&Spec::default_spec()); @@ -1015,17 +1012,14 @@ mod tests { block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Bellatrix(block); - assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context, spec.max_chunk_size as usize)); + assert!(block.ssz_bytes_len() <= spec.max_payload_size as usize); SignedBeaconBlock::from_block(block, Signature::empty()) } /// Bellatrix block with length > MAX_RPC_SIZE. /// The max limit for a Bellatrix block is in the order of ~16GiB which wouldn't fit in memory. /// Hence, we generate a Bellatrix block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. - fn bellatrix_block_large( - fork_context: &ForkContext, - spec: &ChainSpec, - ) -> SignedBeaconBlock { + fn bellatrix_block_large(spec: &ChainSpec) -> SignedBeaconBlock { let mut block: BeaconBlockBellatrix<_, FullPayload> = BeaconBlockBellatrix::empty(&Spec::default_spec()); @@ -1035,7 +1029,7 @@ mod tests { block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Bellatrix(block); - assert!(block.ssz_bytes_len() > max_rpc_size(fork_context, spec.max_chunk_size as usize)); + assert!(block.ssz_bytes_len() > spec.max_payload_size as usize); SignedBeaconBlock::from_block(block, Signature::empty()) } @@ -1143,7 +1137,7 @@ mod tests { ) -> Result { let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(fork_name)); - let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize); + let max_packet_size = spec.max_payload_size as usize; let mut buf = BytesMut::new(); let mut snappy_inbound_codec = @@ -1190,7 +1184,7 @@ mod tests { ) -> Result>, RPCError> { let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(fork_name)); - let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize); + let max_packet_size = spec.max_payload_size as usize; let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); // decode message just as snappy message @@ -1211,7 +1205,7 @@ mod tests { /// Verifies that requests we send are encoded in a way that we would correctly decode too. fn encode_then_decode_request(req: RequestType, fork_name: ForkName, spec: &ChainSpec) { let fork_context = Arc::new(fork_context(fork_name)); - let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize); + let max_packet_size = spec.max_payload_size as usize; let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy); // Encode a request we send let mut buf = BytesMut::new(); @@ -1588,10 +1582,8 @@ mod tests { )))) ); - let bellatrix_block_small = - bellatrix_block_small(&fork_context(ForkName::Bellatrix), &chain_spec); - let bellatrix_block_large = - bellatrix_block_large(&fork_context(ForkName::Bellatrix), &chain_spec); + let bellatrix_block_small = bellatrix_block_small(&chain_spec); + let bellatrix_block_large = bellatrix_block_large(&chain_spec); assert_eq!( encode_then_decode_response( @@ -2091,7 +2083,7 @@ mod tests { // Insert length-prefix uvi_codec - .encode(chain_spec.max_chunk_size as usize + 1, &mut dst) + .encode(chain_spec.max_payload_size as usize + 1, &mut dst) .unwrap(); // Insert snappy stream identifier @@ -2129,7 +2121,7 @@ mod tests { let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( snappy_protocol_id, - max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), + chain_spec.max_payload_size as usize, fork_context, ); @@ -2165,7 +2157,7 @@ mod tests { let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( snappy_protocol_id, - max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), + chain_spec.max_payload_size as usize, fork_context, ); @@ -2194,7 +2186,7 @@ mod tests { let chain_spec = Spec::default_spec(); - let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize); + let max_rpc_size = chain_spec.max_payload_size as usize; let limit = protocol_id.rpc_response_limits::(&fork_context); let mut max = encode_len(limit.max + 1); let mut codec = SSZSnappyOutboundCodec::::new( diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 1156447d56..0f23da7f38 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -33,7 +33,7 @@ pub use methods::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest, ResponseTermination, RpcErrorResponse, StatusMessage, }; -pub use protocol::{max_rpc_size, Protocol, RPCError}; +pub use protocol::{Protocol, RPCError}; use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig}; use self::protocol::RPCProtocol; @@ -136,7 +136,7 @@ pub struct RPCMessage { type BehaviourAction = ToSwarm, RPCSend>; pub struct NetworkParams { - pub max_chunk_size: usize, + pub max_payload_size: usize, pub ttfb_timeout: Duration, pub resp_timeout: Duration, } @@ -306,7 +306,7 @@ where let protocol = SubstreamProtocol::new( RPCProtocol { fork_context: self.fork_context.clone(), - max_rpc_size: max_rpc_size(&self.fork_context, self.network_params.max_chunk_size), + max_rpc_size: self.fork_context.spec.max_payload_size as usize, enable_light_client_server: self.enable_light_client_server, phantom: PhantomData, ttfb_timeout: self.network_params.ttfb_timeout, @@ -336,7 +336,7 @@ where let protocol = SubstreamProtocol::new( RPCProtocol { fork_context: self.fork_context.clone(), - max_rpc_size: max_rpc_size(&self.fork_context, self.network_params.max_chunk_size), + max_rpc_size: self.fork_context.spec.max_payload_size as usize, enable_light_client_server: self.enable_light_client_server, phantom: PhantomData, ttfb_timeout: self.network_params.ttfb_timeout, diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index eac7d67490..8fc1e9a5f4 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -57,7 +57,7 @@ pub static SIGNED_BEACON_BLOCK_ALTAIR_MAX: LazyLock = LazyLock::new(|| { /// The `BeaconBlockBellatrix` block has an `ExecutionPayload` field which has a max size ~16 GiB for future proofing. /// We calculate the value from its fields instead of constructing the block and checking the length. /// Note: This is only the theoretical upper bound. We further bound the max size we receive over the network -/// with `max_chunk_size`. +/// with `max_payload_size`. pub static SIGNED_BEACON_BLOCK_BELLATRIX_MAX: LazyLock = LazyLock::new(|| // Size of a full altair block *SIGNED_BEACON_BLOCK_ALTAIR_MAX @@ -122,15 +122,6 @@ const PROTOCOL_PREFIX: &str = "/eth2/beacon_chain/req"; /// established before the stream is terminated. const REQUEST_TIMEOUT: u64 = 15; -/// Returns the maximum bytes that can be sent across the RPC. -pub fn max_rpc_size(fork_context: &ForkContext, max_chunk_size: usize) -> usize { - if fork_context.current_fork().bellatrix_enabled() { - max_chunk_size - } else { - max_chunk_size / 10 - } -} - /// Returns the rpc limits for beacon_block_by_range and beacon_block_by_root responses. /// /// Note: This function should take care to return the min/max limits accounting for all diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index bc9f2011f8..86da517e21 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -223,7 +223,7 @@ impl Network { let gossipsub_config_params = GossipsubConfigParams { message_domain_valid_snappy: ctx.chain_spec.message_domain_valid_snappy, - gossip_max_size: ctx.chain_spec.gossip_max_size as usize, + gossipsub_max_transmit_size: ctx.chain_spec.max_message_size(), }; let gs_config = gossipsub_config( config.network_load, @@ -334,7 +334,9 @@ impl Network { ) }); - let snappy_transform = SnappyTransform::new(gs_config.max_transmit_size()); + let spec = &ctx.chain_spec; + let snappy_transform = + SnappyTransform::new(spec.max_payload_size as usize, spec.max_compressed_len()); let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform( MessageAuthenticity::Anonymous, gs_config.clone(), @@ -365,7 +367,7 @@ impl Network { }; let network_params = NetworkParams { - max_chunk_size: ctx.chain_spec.max_chunk_size as usize, + max_payload_size: ctx.chain_spec.max_payload_size as usize, ttfb_timeout: ctx.chain_spec.ttfb_timeout(), resp_timeout: ctx.chain_spec.resp_timeout(), }; diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index c199d2312b..880b387250 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -52,13 +52,16 @@ pub enum PubsubMessage { // Implements the `DataTransform` trait of gossipsub to employ snappy compression pub struct SnappyTransform { /// Sets the maximum size we allow gossipsub messages to decompress to. - max_size_per_message: usize, + max_uncompressed_len: usize, + /// Sets the maximum size we allow for compressed gossipsub message data. + max_compressed_len: usize, } impl SnappyTransform { - pub fn new(max_size_per_message: usize) -> Self { + pub fn new(max_uncompressed_len: usize, max_compressed_len: usize) -> Self { SnappyTransform { - max_size_per_message, + max_uncompressed_len, + max_compressed_len, } } } @@ -69,12 +72,19 @@ impl gossipsub::DataTransform for SnappyTransform { &self, raw_message: gossipsub::RawMessage, ) -> Result { - // check the length of the raw bytes - let len = decompress_len(&raw_message.data)?; - if len > self.max_size_per_message { + // first check the size of the compressed payload + if raw_message.data.len() > self.max_compressed_len { return Err(Error::new( ErrorKind::InvalidData, - "ssz_snappy decoded data > GOSSIP_MAX_SIZE", + "ssz_snappy encoded data > max_compressed_len", + )); + } + // check the length of the uncompressed bytes + let len = decompress_len(&raw_message.data)?; + if len > self.max_uncompressed_len { + return Err(Error::new( + ErrorKind::InvalidData, + "ssz_snappy decoded data > MAX_PAYLOAD_SIZE", )); } @@ -98,10 +108,10 @@ impl gossipsub::DataTransform for SnappyTransform { ) -> Result, std::io::Error> { // Currently we are not employing topic-based compression. Everything is expected to be // snappy compressed. - if data.len() > self.max_size_per_message { + if data.len() > self.max_uncompressed_len { return Err(Error::new( ErrorKind::InvalidData, - "ssz_snappy Encoded data > GOSSIP_MAX_SIZE", + "ssz_snappy Encoded data > MAX_PAYLOAD_SIZE", )); } let mut encoder = Encoder::new(); diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 24565ad9c6..7a0eb4602b 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -5,7 +5,7 @@ mod common; use common::{build_tracing_subscriber, Protocol}; use lighthouse_network::rpc::{methods::*, RequestType}; use lighthouse_network::service::api_types::AppRequestId; -use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Response}; +use lighthouse_network::{NetworkEvent, ReportSource, Response}; use ssz::Encode; use ssz_types::VariableList; use std::sync::Arc; @@ -15,14 +15,14 @@ use tokio::time::sleep; use tracing::{debug, warn}; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec, - EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkContext, ForkName, Hash256, MinimalEthSpec, + EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MinimalEthSpec, RuntimeVariableList, Signature, SignedBeaconBlock, Slot, }; type E = MinimalEthSpec; /// Bellatrix block with length < max_rpc_size. -fn bellatrix_block_small(fork_context: &ForkContext, spec: &ChainSpec) -> BeaconBlock { +fn bellatrix_block_small(spec: &ChainSpec) -> BeaconBlock { let mut block = BeaconBlockBellatrix::::empty(spec); let tx = VariableList::from(vec![0; 1024]); let txs = VariableList::from(std::iter::repeat_n(tx, 5000).collect::>()); @@ -30,14 +30,14 @@ fn bellatrix_block_small(fork_context: &ForkContext, spec: &ChainSpec) -> Beacon block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Bellatrix(block); - assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context, spec.max_chunk_size as usize)); + assert!(block.ssz_bytes_len() <= spec.max_payload_size as usize); block } /// Bellatrix block with length > MAX_RPC_SIZE. /// The max limit for a bellatrix block is in the order of ~16GiB which wouldn't fit in memory. /// Hence, we generate a bellatrix block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. -fn bellatrix_block_large(fork_context: &ForkContext, spec: &ChainSpec) -> BeaconBlock { +fn bellatrix_block_large(spec: &ChainSpec) -> BeaconBlock { let mut block = BeaconBlockBellatrix::::empty(spec); let tx = VariableList::from(vec![0; 1024]); let txs = VariableList::from(std::iter::repeat_n(tx, 100000).collect::>()); @@ -45,7 +45,7 @@ fn bellatrix_block_large(fork_context: &ForkContext, spec: &ChainSpec) -> Beacon block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Bellatrix(block); - assert!(block.ssz_bytes_len() > max_rpc_size(fork_context, spec.max_chunk_size as usize)); + assert!(block.ssz_bytes_len() > spec.max_payload_size as usize); block } @@ -188,7 +188,7 @@ fn test_tcp_blocks_by_range_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRange(Some(Arc::new(signed_full_block))); - let full_block = bellatrix_block_small(&common::fork_context(ForkName::Bellatrix), &spec); + let full_block = bellatrix_block_small(&spec); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_bellatrix_small = Response::BlocksByRange(Some(Arc::new(signed_full_block))); @@ -442,7 +442,7 @@ fn test_tcp_blocks_by_range_over_limit() { })); // BlocksByRange Response - let full_block = bellatrix_block_large(&common::fork_context(ForkName::Bellatrix), &spec); + let full_block = bellatrix_block_large(&spec); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_bellatrix_large = Response::BlocksByRange(Some(Arc::new(signed_full_block))); @@ -813,7 +813,7 @@ fn test_tcp_blocks_by_root_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRoot(Some(Arc::new(signed_full_block))); - let full_block = bellatrix_block_small(&common::fork_context(ForkName::Bellatrix), &spec); + let full_block = bellatrix_block_small(&spec); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_bellatrix_small = Response::BlocksByRoot(Some(Arc::new(signed_full_block))); diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 7c38ae9d75..b129b54841 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -88,6 +88,15 @@ pub static BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE: LazyLock> = + LazyLock::new(|| { + try_create_histogram_vec_with_buckets( + "beacon_processor_get_block_roots_time_seconds", + "Time to complete get_block_roots when serving by_range requests", + decimal_buckets(-3, -1), + &["source"], + ) + }); /* * Gossip processor diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 4694c926c9..bc97f88492 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -1,9 +1,10 @@ +use crate::metrics; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; -use itertools::process_results; +use itertools::{process_results, Itertools}; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, }; @@ -588,97 +589,57 @@ impl NetworkBeaconProcessor { inbound_request_id: InboundRequestId, req: BlocksByRangeRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { + let req_start_slot = *req.start_slot(); + let req_count = *req.count(); + debug!( %peer_id, - count = req.count(), - start_slot = %req.start_slot(), + count = req_count, + start_slot = %req_start_slot, "Received BlocksByRange Request" ); - let forwards_block_root_iter = match self - .chain - .forwards_iter_block_roots(Slot::from(*req.start_slot())) - { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!( - requested_slot = %slot, - oldest_known_slot = %oldest_block_slot, - "Range request failed during backfill" - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!( - request = ?req, - %peer_id, - error = ?e, - "Unable to obtain root iter" - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = None; - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot().saturating_add(*req.count()) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!( - request = ?req, - %peer_id, - error = ?e, - "Error during iteration over blocks" - ); - return Err((RpcErrorResponse::ServerError, "Iteration error")); - } - }; - - // remove all skip slots - let block_roots = block_roots.into_iter().flatten().collect::>(); + // Spawn a blocking handle since get_block_roots_for_slot_range takes a sync lock on the + // fork-choice. + let network_beacon_processor = self.clone(); + let block_roots = self + .executor + .spawn_blocking_handle( + move || { + network_beacon_processor.get_block_roots_for_slot_range( + req_start_slot, + req_count, + "BlocksByRange", + ) + }, + "get_block_roots_for_slot_range", + ) + .ok_or((RpcErrorResponse::ServerError, "shutting down"))? + .await + .map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??; let current_slot = self .chain .slot() .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { - if blocks_sent < (*req.count() as usize) { + let log_results = |peer_id, blocks_sent| { + if blocks_sent < (req_count as usize) { debug!( %peer_id, msg = "Failed to return all requested blocks", - start_slot = %req.start_slot(), + start_slot = %req_start_slot, %current_slot, - requested = req.count(), + requested = req_count, returned = blocks_sent, "BlocksByRange outgoing response processed" ); } else { debug!( %peer_id, - start_slot = %req.start_slot(), + start_slot = %req_start_slot, %current_slot, - requested = req.count(), + requested = req_count, returned = blocks_sent, "BlocksByRange outgoing response processed" ); @@ -700,8 +661,7 @@ impl NetworkBeaconProcessor { Ok(Some(block)) => { // Due to skip slots, blocks could be out of the range, we ensure they // are in the range before sending - if block.slot() >= *req.start_slot() - && block.slot() < req.start_slot() + req.count() + if block.slot() >= req_start_slot && block.slot() < req_start_slot + req.count() { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { @@ -718,7 +678,7 @@ impl NetworkBeaconProcessor { request_root = ?root, "Block in the chain is not in the store" ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); return Err((RpcErrorResponse::ServerError, "Database inconsistency")); } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { @@ -727,7 +687,7 @@ impl NetworkBeaconProcessor { reason = "execution layer not synced", "Failed to fetch execution payload for blocks by range request" ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err(( RpcErrorResponse::ResourceUnavailable, @@ -753,17 +713,144 @@ impl NetworkBeaconProcessor { "Error fetching block for peer" ); } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err((RpcErrorResponse::ServerError, "Failed fetching blocks")); } } } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); Ok(()) } + fn get_block_roots_for_slot_range( + &self, + req_start_slot: u64, + req_count: u64, + req_type: &str, + ) -> Result, (RpcErrorResponse, &'static str)> { + let start_time = std::time::Instant::now(); + let finalized_slot = self + .chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + let (block_roots, source) = if req_start_slot >= finalized_slot.as_u64() { + // If the entire requested range is after finalization, use fork_choice + ( + self.chain + .block_roots_from_fork_choice(req_start_slot, req_count), + "fork_choice", + ) + } else if req_start_slot + req_count <= finalized_slot.as_u64() { + // If the entire requested range is before finalization, use store + ( + self.get_block_roots_from_store(req_start_slot, req_count)?, + "store", + ) + } else { + // Split the request at the finalization boundary + let count_from_store = finalized_slot.as_u64() - req_start_slot; + let count_from_fork_choice = req_count - count_from_store; + let start_slot_fork_choice = finalized_slot.as_u64(); + + // Get roots from store (up to and including finalized slot) + let mut roots_from_store = + self.get_block_roots_from_store(req_start_slot, count_from_store)?; + + // Get roots from fork choice (after finalized slot) + let roots_from_fork_choice = self + .chain + .block_roots_from_fork_choice(start_slot_fork_choice, count_from_fork_choice); + + roots_from_store.extend(roots_from_fork_choice); + + (roots_from_store, "mixed") + }; + + let elapsed = start_time.elapsed(); + metrics::observe_timer_vec( + &metrics::BEACON_PROCESSOR_GET_BLOCK_ROOTS_TIME, + &[source], + elapsed, + ); + + debug!( + req_type, + start_slot = %req_start_slot, + req_count, + roots_count = block_roots.len(), + source, + elapsed = ?elapsed, + %finalized_slot, + "Range request block roots retrieved" + ); + + Ok(block_roots) + } + + /// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator. + fn get_block_roots_from_store( + &self, + start_slot: u64, + count: u64, + ) -> Result, (RpcErrorResponse, &'static str)> { + let forwards_block_root_iter = + match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockOutOfRange { + slot, + oldest_block_slot, + }) => { + debug!( + requested_slot = %slot, + oldest_known_slot = %oldest_block_slot, + "Range request failed during backfill" + ); + return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); + } + Err(e) => { + error!( + %start_slot, + count, + error = ?e, + "Unable to obtain root iter for range request" + ); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + }; + + // Pick out the required blocks, ignoring skip-slots. + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < start_slot.saturating_add(count)) + .collect::>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!( + %start_slot, + count, + error = ?e, + "Error during iteration over blocks for range request" + ); + return Err((RpcErrorResponse::ServerError, "Iteration error")); + } + }; + + // remove all skip slots i.e. duplicated roots + Ok(block_roots + .into_iter() + .map(|(root, _)| root) + .unique() + .collect::>()) + } + /// Handle a `BlobsByRange` request from the peer. pub fn handle_blobs_by_range_request( self: Arc, @@ -830,68 +917,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!( - requested_slot = %slot, - oldest_known_slot = %oldest_block_slot, - "Range request failed during backfill" - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!( - request = ?req, - %peer_id, - error = ?e, - "Unable to obtain root iter" - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!( - request = ?req, - %peer_id, - error = ?e, - "Error during iteration over blocks" - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; let current_slot = self .chain @@ -909,8 +936,6 @@ impl NetworkBeaconProcessor { ); }; - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); let mut blobs_sent = 0; for root in block_roots { @@ -1022,71 +1047,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!( - requested_slot = %slot, - oldest_known_slot = %oldest_block_slot, - "Range request failed during backfill" - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!( - request = ?req, - %peer_id, - error = ?e, - "Unable to obtain root iter" - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!( - request = ?req, - %peer_id, - error = ?e, - "Error during iteration over blocks" - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "DataColumnsByRange")?; let mut data_columns_sent = 0; for root in block_roots { diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs index 49ef5c279c..67c24b9c7a 100644 --- a/beacon_node/operation_pool/src/attestation_storage.rs +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -1,6 +1,6 @@ use crate::AttestationStats; use itertools::Itertools; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use types::{ attestation::{AttestationBase, AttestationElectra}, superstruct, AggregateSignature, Attestation, AttestationData, BeaconState, BitList, BitVector, @@ -119,6 +119,18 @@ impl CompactAttestationRef<'_, E> { } } + pub fn get_committee_indices_map(&self) -> HashSet { + match self.indexed { + CompactIndexedAttestation::Base(_) => HashSet::from([self.data.index]), + CompactIndexedAttestation::Electra(indexed_att) => indexed_att + .committee_bits + .iter() + .enumerate() + .filter_map(|(index, bit)| if bit { Some(index as u64) } else { None }) + .collect(), + } + } + pub fn clone_as_attestation(&self) -> Attestation { match self.indexed { CompactIndexedAttestation::Base(indexed_att) => Attestation::Base(AttestationBase { @@ -268,7 +280,11 @@ impl CompactIndexedAttestationElectra { } pub fn committee_index(&self) -> Option { - self.get_committee_indices().first().copied() + self.committee_bits + .iter() + .enumerate() + .find(|&(_, bit)| bit) + .map(|(index, _)| index as u64) } pub fn get_committee_indices(&self) -> Vec { diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 584a5f9f32..ec8c6640b1 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -1,5 +1,5 @@ mod attestation; -mod attestation_storage; +pub mod attestation_storage; mod attester_slashing; mod bls_to_execution_changes; mod max_cover; @@ -47,7 +47,7 @@ type SyncContributions = RwLock { /// Map from attestation ID (see below) to vectors of attestations. - attestations: RwLock>, + pub attestations: RwLock>, /// Map from sync aggregate ID to the best `SyncCommitteeContribution`s seen for that ID. sync_contributions: SyncContributions, /// Set of attester slashings, and the fork version they were verified against. @@ -673,12 +673,12 @@ impl OperationPool { /// This method may return objects that are invalid for block inclusion. pub fn get_filtered_attestations(&self, filter: F) -> Vec> where - F: Fn(&AttestationData) -> bool, + F: Fn(&AttestationData, HashSet) -> bool, { self.attestations .read() .iter() - .filter(|att| filter(&att.attestation_data())) + .filter(|att| filter(&att.attestation_data(), att.get_committee_indices_map())) .map(|att| att.clone_as_attestation()) .collect() } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 8723c2d708..e887aa9abc 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -661,10 +661,7 @@ pub fn get_config( }; } - client_config.chain.max_network_size = lighthouse_network::gossip_max_size( - spec.bellatrix_fork_epoch.is_some(), - spec.gossip_max_size as usize, - ); + client_config.chain.max_network_size = spec.max_payload_size as usize; if cli_args.get_flag("slasher") { let slasher_dir = if let Some(slasher_dir) = cli_args.get_one::("slasher-dir") { diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index 35dd806fb3..5d0ad1f45e 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -19,6 +19,7 @@ mediatype = "0.19.13" multiaddr = "0.18.2" pretty_reqwest_error = { workspace = true } proto_array = { workspace = true } +rand = { workspace = true } reqwest = { workspace = true } reqwest-eventsource = "0.5.0" sensitive_url = { workspace = true } @@ -26,6 +27,7 @@ serde = { workspace = true } serde_json = { workspace = true } slashing_protection = { workspace = true } ssz_types = { workspace = true } +test_random_derive = { path = "../../common/test_random_derive" } types = { workspace = true } zeroize = { workspace = true } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 1c55220b50..d114d037eb 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -820,6 +820,26 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } + /// `GET beacon/states/{state_id}/pending_consolidations` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_states_pending_consolidations( + &self, + state_id: StateId, + ) -> Result>>, Error> + { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("states") + .push(&state_id.to_string()) + .push("pending_consolidations"); + + self.get_opt(path).await + } + /// `GET beacon/light_client/updates` /// /// Returns `Ok(None)` on a 404 error. diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 2e9bac0397..0f9fa7f2c2 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -18,7 +18,9 @@ use std::fmt::{self, Display}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use test_random_derive::TestRandom; use types::beacon_block_body::KzgCommitments; +use types::test_utils::TestRandom; pub use types::*; #[cfg(feature = "lighthouse")] @@ -802,13 +804,13 @@ pub struct LightClientUpdatesQuery { } #[derive(Encode, Decode)] -pub struct LightClientUpdateSszResponse { - pub response_chunk_len: Vec, - pub response_chunk: Vec, +pub struct LightClientUpdateResponseChunk { + pub response_chunk_len: u64, + pub response_chunk: LightClientUpdateResponseChunkInner, } #[derive(Encode, Decode)] -pub struct LightClientUpdateResponseChunk { +pub struct LightClientUpdateResponseChunkInner { pub context: [u8; 4], pub payload: Vec, } @@ -1997,11 +1999,11 @@ impl ForkVersionDeserialize for FullPayloadContents { fork_name: ForkName, ) -> Result { if fork_name.deneb_enabled() { - serde_json::from_value(value) + ExecutionPayloadAndBlobs::deserialize_by_fork::<'de, D>(value, fork_name) .map(Self::PayloadAndBlobs) .map_err(serde::de::Error::custom) } else if fork_name.bellatrix_enabled() { - serde_json::from_value(value) + ExecutionPayload::deserialize_by_fork::<'de, D>(value, fork_name) .map(Self::Payload) .map_err(serde::de::Error::custom) } else { @@ -2019,6 +2021,28 @@ pub struct ExecutionPayloadAndBlobs { pub blobs_bundle: BlobsBundle, } +impl ForkVersionDeserialize for ExecutionPayloadAndBlobs { + fn deserialize_by_fork<'de, D: Deserializer<'de>>( + value: Value, + fork_name: ForkName, + ) -> Result { + #[derive(Deserialize)] + #[serde(bound = "E: EthSpec")] + struct Helper { + execution_payload: serde_json::Value, + blobs_bundle: BlobsBundle, + } + let helper: Helper = serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Self { + execution_payload: ExecutionPayload::deserialize_by_fork::<'de, D>( + helper.execution_payload, + fork_name, + )?, + blobs_bundle: helper.blobs_bundle, + }) + } +} + impl ForkVersionDecode for ExecutionPayloadAndBlobs { fn from_ssz_bytes_by_fork(bytes: &[u8], fork_name: ForkName) -> Result { let mut builder = ssz::SszDecoderBuilder::new(bytes); @@ -2049,7 +2073,7 @@ pub enum ContentType { Ssz, } -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Encode, Decode)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, Encode, Decode, TestRandom)] #[serde(bound = "E: EthSpec")] pub struct BlobsBundle { pub commitments: KzgCommitments, @@ -2144,6 +2168,10 @@ pub struct StandardAttestationRewards { #[cfg(test)] mod test { + use std::fmt::Debug; + + use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; + use super::*; #[test] @@ -2157,4 +2185,107 @@ mod test { let y: ValidatorId = serde_json::from_str(pubkey_str).unwrap(); assert_eq!(serde_json::to_string(&y).unwrap(), pubkey_str); } + + #[test] + fn test_execution_payload_execution_payload_deserialize_by_fork() { + let rng = &mut XorShiftRng::from_seed([42; 16]); + + let payloads = [ + ExecutionPayload::Bellatrix( + ExecutionPayloadBellatrix::::random_for_test(rng), + ), + ExecutionPayload::Capella(ExecutionPayloadCapella::::random_for_test( + rng, + )), + ExecutionPayload::Deneb(ExecutionPayloadDeneb::::random_for_test( + rng, + )), + ExecutionPayload::Electra(ExecutionPayloadElectra::::random_for_test( + rng, + )), + ExecutionPayload::Fulu(ExecutionPayloadFulu::::random_for_test(rng)), + ]; + let merged_forks = &ForkName::list_all()[2..]; + assert_eq!( + payloads.len(), + merged_forks.len(), + "we should test every known fork; add new fork variant to payloads above" + ); + + for (payload, &fork_name) in payloads.into_iter().zip(merged_forks) { + assert_eq!(payload.fork_name(), fork_name); + let payload_str = serde_json::to_string(&payload).unwrap(); + let mut de = serde_json::Deserializer::from_str(&payload_str); + generic_deserialize_by_fork(&mut de, payload, fork_name); + } + } + + #[test] + fn test_execution_payload_and_blobs_deserialize_by_fork() { + let rng = &mut XorShiftRng::from_seed([42; 16]); + + let payloads = [ + { + let execution_payload = + ExecutionPayload::Deneb( + ExecutionPayloadDeneb::::random_for_test(rng), + ); + let blobs_bundle = BlobsBundle::random_for_test(rng); + ExecutionPayloadAndBlobs { + execution_payload, + blobs_bundle, + } + }, + { + let execution_payload = + ExecutionPayload::Electra( + ExecutionPayloadElectra::::random_for_test(rng), + ); + let blobs_bundle = BlobsBundle::random_for_test(rng); + ExecutionPayloadAndBlobs { + execution_payload, + blobs_bundle, + } + }, + { + let execution_payload = + ExecutionPayload::Fulu( + ExecutionPayloadFulu::::random_for_test(rng), + ); + let blobs_bundle = BlobsBundle::random_for_test(rng); + ExecutionPayloadAndBlobs { + execution_payload, + blobs_bundle, + } + }, + ]; + let blob_forks = &ForkName::list_all()[4..]; + + assert_eq!( + payloads.len(), + blob_forks.len(), + "we should test every known fork; add new fork variant to payloads above" + ); + + for (payload, &fork_name) in payloads.into_iter().zip(blob_forks) { + assert_eq!(payload.execution_payload.fork_name(), fork_name); + let payload_str = serde_json::to_string(&payload).unwrap(); + let mut de = serde_json::Deserializer::from_str(&payload_str); + generic_deserialize_by_fork(&mut de, payload, fork_name); + } + } + + fn generic_deserialize_by_fork< + 'de, + D: Deserializer<'de>, + O: ForkVersionDeserialize + PartialEq + Debug, + >( + deserializer: D, + original: O, + fork_name: ForkName, + ) { + let val = Value::deserialize(deserializer).unwrap(); + let roundtrip = O::deserialize_by_fork::<'de, D>(val, fork_name).unwrap(); + assert_eq!(original, roundtrip); + } } diff --git a/common/eth2_network_config/built_in_network_configs/chiado/config.yaml b/common/eth2_network_config/built_in_network_configs/chiado/config.yaml index 1455ec5f63..dbfe2707d7 100644 --- a/common/eth2_network_config/built_in_network_configs/chiado/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/chiado/config.yaml @@ -100,15 +100,13 @@ DEPOSIT_CONTRACT_ADDRESS: 0xb97036A26259B7147018913bD58a774cf91acf25 # Networking # --------------------------------------------------------------- # `10 * 2**20` (= 10485760, 10 MiB) -GOSSIP_MAX_SIZE: 10485760 +MAX_PAYLOAD_SIZE: 10485760 # `2**10` (= 1024) MAX_REQUEST_BLOCKS: 1024 # `2**8` (= 256) EPOCHS_PER_SUBNET_SUBSCRIPTION: 256 # 33024, ~31 days MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 -# `10 * 2**20` (=10485760, 10 MiB) -MAX_CHUNK_SIZE: 10485760 # 5s TTFB_TIMEOUT: 5 # 10s diff --git a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml index 9ff5a16198..4413c21c4b 100644 --- a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml @@ -43,7 +43,7 @@ DENEB_FORK_VERSION: 0x04000064 DENEB_FORK_EPOCH: 889856 # 2024-03-11T18:30:20.000Z # Electra ELECTRA_FORK_VERSION: 0x05000064 -ELECTRA_FORK_EPOCH: 18446744073709551615 +ELECTRA_FORK_EPOCH: 1337856 # 2025-04-30T14:03:40.000Z # Fulu FULU_FORK_VERSION: 0x06000064 FULU_FORK_EPOCH: 18446744073709551615 @@ -97,9 +97,8 @@ DEPOSIT_CONTRACT_ADDRESS: 0x0B98057eA310F4d31F2a452B414647007d1645d9 # Network # --------------------------------------------------------------- SUBNETS_PER_NODE: 4 -GOSSIP_MAX_SIZE: 10485760 +MAX_PAYLOAD_SIZE: 10485760 MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 -MAX_CHUNK_SIZE: 10485760 TTFB_TIMEOUT: 5 RESP_TIMEOUT: 10 MESSAGE_DOMAIN_INVALID_SNAPPY: 0x00000000 @@ -118,8 +117,20 @@ MAX_REQUEST_BLOB_SIDECARS: 768 MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 16384 # `6` BLOB_SIDECAR_SUBNET_COUNT: 6 -# `uint64(6)` -MAX_BLOBS_PER_BLOCK: 6 +# `uint64(2)` +MAX_BLOBS_PER_BLOCK: 2 + +# Electra +# 2**7 * 10**9 (= 128,000,000,000) +MIN_PER_EPOCH_CHURN_LIMIT_ELECTRA: 128000000000 +# 2**6 * 10**9 (= 64,000,000,000) +MAX_PER_EPOCH_ACTIVATION_EXIT_CHURN_LIMIT: 64000000000 +# `2` +BLOB_SIDECAR_SUBNET_COUNT_ELECTRA: 2 +# `uint64(2)` +MAX_BLOBS_PER_BLOCK_ELECTRA: 2 +# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA +MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256 # DAS NUMBER_OF_COLUMNS: 128 diff --git a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml index e5f38b8c9b..58010991bf 100644 --- a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml @@ -88,15 +88,13 @@ DEPOSIT_CONTRACT_ADDRESS: 0x4242424242424242424242424242424242424242 # Networking # --------------------------------------------------------------- # `10 * 2**20` (= 10485760, 10 MiB) -GOSSIP_MAX_SIZE: 10485760 +MAX_PAYLOAD_SIZE: 10485760 # `2**10` (= 1024) MAX_REQUEST_BLOCKS: 1024 # `2**8` (= 256) EPOCHS_PER_SUBNET_SUBSCRIPTION: 256 # `MIN_VALIDATOR_WITHDRAWABILITY_DELAY + CHURN_LIMIT_QUOTIENT // 2` (= 33024, ~5 months) MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 -# `10 * 2**20` (=10485760, 10 MiB) -MAX_CHUNK_SIZE: 10485760 # 5s TTFB_TIMEOUT: 5 # 10s diff --git a/common/eth2_network_config/built_in_network_configs/hoodi/config.yaml b/common/eth2_network_config/built_in_network_configs/hoodi/config.yaml index 19d7797424..5cca1cd037 100644 --- a/common/eth2_network_config/built_in_network_configs/hoodi/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/hoodi/config.yaml @@ -93,15 +93,13 @@ DEPOSIT_CONTRACT_ADDRESS: 0x00000000219ab540356cBB839Cbe05303d7705Fa # Networking # --------------------------------------------------------------- # `10 * 2**20` (= 10485760, 10 MiB) -GOSSIP_MAX_SIZE: 10485760 +MAX_PAYLOAD_SIZE: 10485760 # `2**10` (= 1024) MAX_REQUEST_BLOCKS: 1024 # `2**8` (= 256) EPOCHS_PER_SUBNET_SUBSCRIPTION: 256 # `MIN_VALIDATOR_WITHDRAWABILITY_DELAY + CHURN_LIMIT_QUOTIENT // 2` (= 33024, ~5 months) MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 -# `10 * 2**20` (=10485760, 10 MiB) -MAX_CHUNK_SIZE: 10485760 # 5s TTFB_TIMEOUT: 5 # 10s diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml index 74fe727867..375441e504 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml @@ -49,7 +49,7 @@ DENEB_FORK_VERSION: 0x04000000 DENEB_FORK_EPOCH: 269568 # March 13, 2024, 01:55:35pm UTC # Electra ELECTRA_FORK_VERSION: 0x05000000 -ELECTRA_FORK_EPOCH: 18446744073709551615 +ELECTRA_FORK_EPOCH: 364032 # May 7, 2025, 10:05:11am UTC # Fulu FULU_FORK_VERSION: 0x06000000 FULU_FORK_EPOCH: 18446744073709551615 @@ -103,15 +103,13 @@ DEPOSIT_CONTRACT_ADDRESS: 0x00000000219ab540356cBB839Cbe05303d7705Fa # Networking # --------------------------------------------------------------- # `10 * 2**20` (= 10485760, 10 MiB) -GOSSIP_MAX_SIZE: 10485760 +MAX_PAYLOAD_SIZE: 10485760 # `2**10` (= 1024) MAX_REQUEST_BLOCKS: 1024 # `2**8` (= 256) EPOCHS_PER_SUBNET_SUBSCRIPTION: 256 # `MIN_VALIDATOR_WITHDRAWABILITY_DELAY + CHURN_LIMIT_QUOTIENT // 2` (= 33024, ~5 months) MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 -# `10 * 2**20` (=10485760, 10 MiB) -MAX_CHUNK_SIZE: 10485760 # 5s TTFB_TIMEOUT: 5 # 10s @@ -142,6 +140,18 @@ BLOB_SIDECAR_SUBNET_COUNT: 6 # `uint64(6)` MAX_BLOBS_PER_BLOCK: 6 +# Electra +# 2**7 * 10**9 (= 128,000,000,000) +MIN_PER_EPOCH_CHURN_LIMIT_ELECTRA: 128000000000 +# 2**8 * 10**9 (= 256,000,000,000) +MAX_PER_EPOCH_ACTIVATION_EXIT_CHURN_LIMIT: 256000000000 +# `9` +BLOB_SIDECAR_SUBNET_COUNT_ELECTRA: 9 +# `uint64(9)` +MAX_BLOBS_PER_BLOCK_ELECTRA: 9 +# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA +MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152 + # DAS NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 diff --git a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml index af78332205..e9e8a3ab14 100644 --- a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml @@ -89,15 +89,13 @@ DEPOSIT_CONTRACT_ADDRESS: 0x7f02C3E3c98b133055B8B348B2Ac625669Ed295D # Networking # --------------------------------------------------------------- # `10 * 2**20` (= 10485760, 10 MiB) -GOSSIP_MAX_SIZE: 10485760 +MAX_PAYLOAD_SIZE: 10485760 # `2**10` (= 1024) MAX_REQUEST_BLOCKS: 1024 # `2**8` (= 256) EPOCHS_PER_SUBNET_SUBSCRIPTION: 256 # `MIN_VALIDATOR_WITHDRAWABILITY_DELAY + CHURN_LIMIT_QUOTIENT // 2` (= 33024, ~5 months) MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 -# `10 * 2**20` (=10485760, 10 MiB) -MAX_CHUNK_SIZE: 10485760 # 5s TTFB_TIMEOUT: 5 # 10s diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 880c93d5c9..dde2411787 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -856,10 +856,18 @@ impl ProtoArrayForkChoice { } /// See `ProtoArray::iter_nodes` - pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> { + pub fn iter_nodes(&self, block_root: &Hash256) -> Iter { self.proto_array.iter_nodes(block_root) } + /// See `ProtoArray::iter_block_roots` + pub fn iter_block_roots( + &self, + block_root: &Hash256, + ) -> impl Iterator + use<'_> { + self.proto_array.iter_block_roots(block_root) + } + pub fn as_bytes(&self) -> Vec { SszContainer::from(self).as_ssz_bytes() } diff --git a/consensus/state_processing/src/genesis.rs b/consensus/state_processing/src/genesis.rs index 10723ecc51..8e62427ef1 100644 --- a/consensus/state_processing/src/genesis.rs +++ b/consensus/state_processing/src/genesis.rs @@ -123,8 +123,7 @@ pub fn initialize_beacon_state_from_eth1( // Remove intermediate Deneb fork from `state.fork`. state.fork_mut().previous_version = spec.electra_fork_version; - // TODO(electra): think about this more and determine the best way to - // do this. The spec tests will expect that the sync committees are + // The spec tests will expect that the sync committees are // calculated using the electra value for MAX_EFFECTIVE_BALANCE when // calling `initialize_beacon_state_from_eth1()`. But the sync committees // are actually calcuated back in `upgrade_to_altair()`. We need to diff --git a/consensus/state_processing/src/per_block_processing/verify_attestation.rs b/consensus/state_processing/src/per_block_processing/verify_attestation.rs index 0b399bea6c..6b4a394c73 100644 --- a/consensus/state_processing/src/per_block_processing/verify_attestation.rs +++ b/consensus/state_processing/src/per_block_processing/verify_attestation.rs @@ -63,7 +63,7 @@ pub fn verify_attestation_for_state<'ctxt, E: EthSpec>( ) -> Result> { let data = attestation.data(); - // TODO(electra) choosing a validation based on the attestation's fork + // NOTE: choosing a validation based on the attestation's fork // rather than the state's fork makes this simple, but technically the spec // defines this verification based on the state's fork. match attestation { diff --git a/consensus/types/presets/mainnet/electra.yaml b/consensus/types/presets/mainnet/electra.yaml index 42afbb233e..55308d5b1c 100644 --- a/consensus/types/presets/mainnet/electra.yaml +++ b/consensus/types/presets/mainnet/electra.yaml @@ -7,44 +7,44 @@ MIN_ACTIVATION_BALANCE: 32000000000 # 2**11 * 10**9 (= 2,048,000,000,000) Gwei MAX_EFFECTIVE_BALANCE_ELECTRA: 2048000000000 -# State list lengths +# Rewards and penalties # --------------------------------------------------------------- -# `uint64(2**27)` (= 134,217,728) -PENDING_DEPOSITS_LIMIT: 134217728 -# `uint64(2**27)` (= 134,217,728) -PENDING_PARTIAL_WITHDRAWALS_LIMIT: 134217728 -# `uint64(2**18)` (= 262,144) -PENDING_CONSOLIDATIONS_LIMIT: 262144 - -# Reward and penalty quotients -# --------------------------------------------------------------- -# `uint64(2**12)` (= 4,096) +# 2**12 (= 4,096) MIN_SLASHING_PENALTY_QUOTIENT_ELECTRA: 4096 -# `uint64(2**12)` (= 4,096) +# 2**12 (= 4,096) WHISTLEBLOWER_REWARD_QUOTIENT_ELECTRA: 4096 -# # Max operations per block +# State list lengths # --------------------------------------------------------------- -# `uint64(2**0)` (= 1) +# 2**27 (= 134,217,728) pending deposits +PENDING_DEPOSITS_LIMIT: 134217728 +# 2**27 (= 134,217,728) pending partial withdrawals +PENDING_PARTIAL_WITHDRAWALS_LIMIT: 134217728 +# 2**18 (= 262,144) pending consolidations +PENDING_CONSOLIDATIONS_LIMIT: 262144 + +# Max operations per block +# --------------------------------------------------------------- +# 2**0 (= 1) attester slashings MAX_ATTESTER_SLASHINGS_ELECTRA: 1 -# `uint64(2**3)` (= 8) +# 2**3 (= 8) attestations MAX_ATTESTATIONS_ELECTRA: 8 -# `uint64(2**1)` (= 2) -MAX_CONSOLIDATION_REQUESTS_PER_PAYLOAD: 2 # Execution # --------------------------------------------------------------- -# 2**13 (= 8192) deposit requests +# 2**13 (= 8,192) deposit requests MAX_DEPOSIT_REQUESTS_PER_PAYLOAD: 8192 # 2**4 (= 16) withdrawal requests MAX_WITHDRAWAL_REQUESTS_PER_PAYLOAD: 16 +# 2**1 (= 2) consolidation requests +MAX_CONSOLIDATION_REQUESTS_PER_PAYLOAD: 2 # Withdrawals processing # --------------------------------------------------------------- -# 2**3 ( = 8) pending withdrawals +# 2**3 (= 8) pending withdrawals MAX_PENDING_PARTIALS_PER_WITHDRAWALS_SWEEP: 8 # Pending deposits processing # --------------------------------------------------------------- -# 2**4 ( = 4) pending deposits +# 2**4 (= 16) pending deposits MAX_PENDING_DEPOSITS_PER_EPOCH: 16 diff --git a/consensus/types/presets/minimal/electra.yaml b/consensus/types/presets/minimal/electra.yaml index 44e4769756..f99effe0f1 100644 --- a/consensus/types/presets/minimal/electra.yaml +++ b/consensus/types/presets/minimal/electra.yaml @@ -7,44 +7,44 @@ MIN_ACTIVATION_BALANCE: 32000000000 # 2**11 * 10**9 (= 2,048,000,000,000) Gwei MAX_EFFECTIVE_BALANCE_ELECTRA: 2048000000000 -# State list lengths +# Rewards and penalties # --------------------------------------------------------------- -# `uint64(2**27)` (= 134,217,728) -PENDING_DEPOSITS_LIMIT: 134217728 -# [customized] `uint64(2**6)` (= 64) -PENDING_PARTIAL_WITHDRAWALS_LIMIT: 64 -# [customized] `uint64(2**6)` (= 64) -PENDING_CONSOLIDATIONS_LIMIT: 64 - -# Reward and penalty quotients -# --------------------------------------------------------------- -# `uint64(2**12)` (= 4,096) +# 2**12 (= 4,096) MIN_SLASHING_PENALTY_QUOTIENT_ELECTRA: 4096 -# `uint64(2**12)` (= 4,096) +# 2**12 (= 4,096) WHISTLEBLOWER_REWARD_QUOTIENT_ELECTRA: 4096 -# # Max operations per block +# State list lengths # --------------------------------------------------------------- -# `uint64(2**0)` (= 1) +# 2**27 (= 134,217,728) pending deposits +PENDING_DEPOSITS_LIMIT: 134217728 +# [customized] 2**6 (= 64) pending partial withdrawals +PENDING_PARTIAL_WITHDRAWALS_LIMIT: 64 +# [customized] 2**6 (= 64) pending consolidations +PENDING_CONSOLIDATIONS_LIMIT: 64 + +# Max operations per block +# --------------------------------------------------------------- +# 2**0 (= 1) attester slashings MAX_ATTESTER_SLASHINGS_ELECTRA: 1 -# `uint64(2**3)` (= 8) +# 2**3 (= 8) attestations MAX_ATTESTATIONS_ELECTRA: 8 -# `uint64(2**1)` (= 2) -MAX_CONSOLIDATION_REQUESTS_PER_PAYLOAD: 2 # Execution # --------------------------------------------------------------- -# [customized] +# [customized] 2**2 (= 4) deposit requests MAX_DEPOSIT_REQUESTS_PER_PAYLOAD: 4 # [customized] 2**1 (= 2) withdrawal requests MAX_WITHDRAWAL_REQUESTS_PER_PAYLOAD: 2 +# 2**1 (= 2) consolidation requests +MAX_CONSOLIDATION_REQUESTS_PER_PAYLOAD: 2 # Withdrawals processing # --------------------------------------------------------------- -# 2**1 ( = 2) pending withdrawals +# 2**1 (= 2) pending withdrawals MAX_PENDING_PARTIALS_PER_WITHDRAWALS_SWEEP: 2 # Pending deposits processing # --------------------------------------------------------------- -# 2**4 ( = 4) pending deposits +# 2**4 (= 16) pending deposits MAX_PENDING_DEPOSITS_PER_EPOCH: 16 diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 5d147f1e86..e769057182 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -5,6 +5,7 @@ use derivative::Derivative; use serde::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use ssz_types::BitVector; +use std::collections::HashSet; use std::hash::{Hash, Hasher}; use superstruct::superstruct; use test_random_derive::TestRandom; @@ -210,6 +211,13 @@ impl Attestation { } } + pub fn get_committee_indices_map(&self) -> HashSet { + match self { + Attestation::Base(att) => HashSet::from([att.data.index]), + Attestation::Electra(att) => att.get_committee_indices().into_iter().collect(), + } + } + pub fn is_aggregation_bits_zero(&self) -> bool { match self { Attestation::Base(att) => att.aggregation_bits.is_zero(), @@ -293,7 +301,11 @@ impl AttestationRef<'_, E> { impl AttestationElectra { pub fn committee_index(&self) -> Option { - self.get_committee_indices().first().cloned() + self.committee_bits + .iter() + .enumerate() + .find(|&(_, bit)| bit) + .map(|(index, _)| index as u64) } pub fn get_aggregation_bits(&self) -> Vec { diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 1650001db6..27c324aa2a 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -210,10 +210,9 @@ pub struct ChainSpec { pub boot_nodes: Vec, pub network_id: u8, pub target_aggregators_per_committee: u64, - pub gossip_max_size: u64, + pub max_payload_size: u64, max_request_blocks: u64, pub min_epochs_for_block_requests: u64, - pub max_chunk_size: u64, pub ttfb_timeout: u64, pub resp_timeout: u64, pub attestation_propagation_slot_range: u64, @@ -716,6 +715,35 @@ impl ChainSpec { (0..self.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new) } + /// Worst-case compressed length for a given payload of size n when using snappy. + /// + /// https://github.com/google/snappy/blob/32ded457c0b1fe78ceb8397632c416568d6714a0/snappy.cc#L218C1-L218C47 + /// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#max_compressed_len + fn max_compressed_len_snappy(n: usize) -> Option { + 32_usize.checked_add(n)?.checked_add(n / 6) + } + + /// Max compressed length of a message that we receive over gossip. + pub fn max_compressed_len(&self) -> usize { + Self::max_compressed_len_snappy(self.max_payload_size as usize) + .expect("should not overflow") + } + + /// Max allowed size of a raw, compressed message received over the network. + /// + /// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#max_compressed_len + pub fn max_message_size(&self) -> usize { + std::cmp::max( + // 1024 to account for framing + encoding overhead + Self::max_compressed_len_snappy(self.max_payload_size as usize) + .expect("should not overflow") + .safe_add(1024) + .expect("should not overflow"), + //1MB + 1024 * 1024, + ) + } + /// Returns a `ChainSpec` compatible with the Ethereum Foundation specification. pub fn mainnet() -> Self { Self { @@ -883,7 +911,7 @@ impl ChainSpec { * Electra hard fork params */ electra_fork_version: [0x05, 00, 00, 00], - electra_fork_epoch: None, + electra_fork_epoch: Some(Epoch::new(364032)), unset_deposit_requests_start_index: u64::MAX, full_exit_request_amount: 0, min_activation_balance: option_wrapper(|| { @@ -930,9 +958,8 @@ impl ChainSpec { subnets_per_node: 2, maximum_gossip_clock_disparity_millis: default_maximum_gossip_clock_disparity_millis(), target_aggregators_per_committee: 16, - gossip_max_size: default_gossip_max_size(), + max_payload_size: default_max_payload_size(), min_epochs_for_block_requests: default_min_epochs_for_block_requests(), - max_chunk_size: default_max_chunk_size(), ttfb_timeout: default_ttfb_timeout(), resp_timeout: default_resp_timeout(), message_domain_invalid_snappy: default_message_domain_invalid_snappy(), @@ -1213,7 +1240,7 @@ impl ChainSpec { * Electra hard fork params */ electra_fork_version: [0x05, 0x00, 0x00, 0x64], - electra_fork_epoch: None, + electra_fork_epoch: Some(Epoch::new(1337856)), unset_deposit_requests_start_index: u64::MAX, full_exit_request_amount: 0, min_activation_balance: option_wrapper(|| { @@ -1235,7 +1262,7 @@ impl ChainSpec { }) .expect("calculation does not overflow"), max_per_epoch_activation_exit_churn_limit: option_wrapper(|| { - u64::checked_pow(2, 8)?.checked_mul(u64::checked_pow(10, 9)?) + u64::checked_pow(2, 6)?.checked_mul(u64::checked_pow(10, 9)?) }) .expect("calculation does not overflow"), @@ -1260,9 +1287,8 @@ impl ChainSpec { subnets_per_node: 4, // Make this larger than usual to avoid network damage maximum_gossip_clock_disparity_millis: default_maximum_gossip_clock_disparity_millis(), target_aggregators_per_committee: 16, - gossip_max_size: default_gossip_max_size(), + max_payload_size: default_max_payload_size(), min_epochs_for_block_requests: 33024, - max_chunk_size: default_max_chunk_size(), ttfb_timeout: default_ttfb_timeout(), resp_timeout: default_resp_timeout(), message_domain_invalid_snappy: default_message_domain_invalid_snappy(), @@ -1278,7 +1304,7 @@ impl ChainSpec { max_request_data_column_sidecars: default_max_request_data_column_sidecars(), min_epochs_for_blob_sidecars_requests: 16384, blob_sidecar_subnet_count: default_blob_sidecar_subnet_count(), - max_blobs_per_block: default_max_blobs_per_block(), + max_blobs_per_block: 2, /* * Derived Deneb Specific @@ -1291,9 +1317,9 @@ impl ChainSpec { /* * Networking Electra specific */ - max_blobs_per_block_electra: default_max_blobs_per_block_electra(), - blob_sidecar_subnet_count_electra: default_blob_sidecar_subnet_count_electra(), - max_request_blob_sidecars_electra: default_max_request_blob_sidecars_electra(), + max_blobs_per_block_electra: 2, + blob_sidecar_subnet_count_electra: 2, + max_request_blob_sidecars_electra: 256, /* * Application specific @@ -1434,18 +1460,15 @@ pub struct Config { #[serde(with = "serde_utils::quoted_u64")] gas_limit_adjustment_factor: u64, - #[serde(default = "default_gossip_max_size")] + #[serde(default = "default_max_payload_size")] #[serde(with = "serde_utils::quoted_u64")] - gossip_max_size: u64, + max_payload_size: u64, #[serde(default = "default_max_request_blocks")] #[serde(with = "serde_utils::quoted_u64")] max_request_blocks: u64, #[serde(default = "default_min_epochs_for_block_requests")] #[serde(with = "serde_utils::quoted_u64")] min_epochs_for_block_requests: u64, - #[serde(default = "default_max_chunk_size")] - #[serde(with = "serde_utils::quoted_u64")] - max_chunk_size: u64, #[serde(default = "default_ttfb_timeout")] #[serde(with = "serde_utils::quoted_u64")] ttfb_timeout: u64, @@ -1580,7 +1603,7 @@ const fn default_gas_limit_adjustment_factor() -> u64 { 1024 } -const fn default_gossip_max_size() -> u64 { +const fn default_max_payload_size() -> u64 { 10485760 } @@ -1588,10 +1611,6 @@ const fn default_min_epochs_for_block_requests() -> u64 { 33024 } -const fn default_max_chunk_size() -> u64 { - 10485760 -} - const fn default_ttfb_timeout() -> u64 { 5 } @@ -1857,10 +1876,9 @@ impl Config { gas_limit_adjustment_factor: spec.gas_limit_adjustment_factor, - gossip_max_size: spec.gossip_max_size, + max_payload_size: spec.max_payload_size, max_request_blocks: spec.max_request_blocks, min_epochs_for_block_requests: spec.min_epochs_for_block_requests, - max_chunk_size: spec.max_chunk_size, ttfb_timeout: spec.ttfb_timeout, resp_timeout: spec.resp_timeout, attestation_propagation_slot_range: spec.attestation_propagation_slot_range, @@ -1938,9 +1956,8 @@ impl Config { deposit_network_id, deposit_contract_address, gas_limit_adjustment_factor, - gossip_max_size, + max_payload_size, min_epochs_for_block_requests, - max_chunk_size, ttfb_timeout, resp_timeout, message_domain_invalid_snappy, @@ -2009,9 +2026,8 @@ impl Config { terminal_total_difficulty, terminal_block_hash, terminal_block_hash_activation_epoch, - gossip_max_size, + max_payload_size, min_epochs_for_block_requests, - max_chunk_size, ttfb_timeout, resp_timeout, message_domain_invalid_snappy, @@ -2311,9 +2327,8 @@ mod yaml_tests { check_default!(terminal_block_hash); check_default!(terminal_block_hash_activation_epoch); check_default!(bellatrix_fork_version); - check_default!(gossip_max_size); + check_default!(max_payload_size); check_default!(min_epochs_for_block_requests); - check_default!(max_chunk_size); check_default!(ttfb_timeout); check_default!(resp_timeout); check_default!(message_domain_invalid_snappy); @@ -2339,4 +2354,17 @@ mod yaml_tests { [0, 0, 0, 1] ); } + + #[test] + fn test_max_network_limits_overflow() { + let mut spec = MainnetEthSpec::default_spec(); + // Should not overflow + let _ = spec.max_message_size(); + let _ = spec.max_compressed_len(); + + spec.max_payload_size *= 10; + // Should not overflow even with a 10x increase in max + let _ = spec.max_message_size(); + let _ = spec.max_compressed_len(); + } } diff --git a/consensus/types/src/preset.rs b/consensus/types/src/preset.rs index 707d2d4697..d025c72eac 100644 --- a/consensus/types/src/preset.rs +++ b/consensus/types/src/preset.rs @@ -227,28 +227,36 @@ pub struct ElectraPreset { pub min_activation_balance: u64, #[serde(with = "serde_utils::quoted_u64")] pub max_effective_balance_electra: u64, + #[serde(with = "serde_utils::quoted_u64")] pub min_slashing_penalty_quotient_electra: u64, #[serde(with = "serde_utils::quoted_u64")] pub whistleblower_reward_quotient_electra: u64, - #[serde(with = "serde_utils::quoted_u64")] - pub max_pending_partials_per_withdrawals_sweep: u64, + #[serde(with = "serde_utils::quoted_u64")] pub pending_deposits_limit: u64, #[serde(with = "serde_utils::quoted_u64")] pub pending_partial_withdrawals_limit: u64, #[serde(with = "serde_utils::quoted_u64")] pub pending_consolidations_limit: u64, - #[serde(with = "serde_utils::quoted_u64")] - pub max_consolidation_requests_per_payload: u64, - #[serde(with = "serde_utils::quoted_u64")] - pub max_deposit_requests_per_payload: u64, + #[serde(with = "serde_utils::quoted_u64")] pub max_attester_slashings_electra: u64, #[serde(with = "serde_utils::quoted_u64")] pub max_attestations_electra: u64, + + #[serde(with = "serde_utils::quoted_u64")] + pub max_deposit_requests_per_payload: u64, #[serde(with = "serde_utils::quoted_u64")] pub max_withdrawal_requests_per_payload: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub max_consolidation_requests_per_payload: u64, + + #[serde(with = "serde_utils::quoted_u64")] + pub max_pending_partials_per_withdrawals_sweep: u64, + + #[serde(with = "serde_utils::quoted_u64")] + pub max_pending_deposits_per_epoch: u64, } impl ElectraPreset { @@ -256,19 +264,26 @@ impl ElectraPreset { Self { min_activation_balance: spec.min_activation_balance, max_effective_balance_electra: spec.max_effective_balance_electra, + min_slashing_penalty_quotient_electra: spec.min_slashing_penalty_quotient_electra, whistleblower_reward_quotient_electra: spec.whistleblower_reward_quotient_electra, - max_pending_partials_per_withdrawals_sweep: spec - .max_pending_partials_per_withdrawals_sweep, + pending_deposits_limit: E::pending_deposits_limit() as u64, pending_partial_withdrawals_limit: E::pending_partial_withdrawals_limit() as u64, pending_consolidations_limit: E::pending_consolidations_limit() as u64, - max_consolidation_requests_per_payload: E::max_consolidation_requests_per_payload() - as u64, - max_deposit_requests_per_payload: E::max_deposit_requests_per_payload() as u64, + max_attester_slashings_electra: E::max_attester_slashings_electra() as u64, max_attestations_electra: E::max_attestations_electra() as u64, + + max_deposit_requests_per_payload: E::max_deposit_requests_per_payload() as u64, max_withdrawal_requests_per_payload: E::max_withdrawal_requests_per_payload() as u64, + max_consolidation_requests_per_payload: E::max_consolidation_requests_per_payload() + as u64, + + max_pending_partials_per_withdrawals_sweep: spec + .max_pending_partials_per_withdrawals_sweep, + + max_pending_deposits_per_epoch: E::max_pending_deposits_per_epoch() as u64, } } } diff --git a/consensus/types/src/validator.rs b/consensus/types/src/validator.rs index 5aed90d2c1..027958b178 100644 --- a/consensus/types/src/validator.rs +++ b/consensus/types/src/validator.rs @@ -249,7 +249,6 @@ impl Validator { } } - /// TODO(electra): refactor these functions and make it simpler.. this is a mess /// Returns `true` if the validator is partially withdrawable. fn is_partially_withdrawable_validator_capella(&self, balance: u64, spec: &ChainSpec) -> bool { self.has_eth1_withdrawal_credential(spec) diff --git a/lighthouse/environment/tests/testnet_dir/config.yaml b/lighthouse/environment/tests/testnet_dir/config.yaml index 34e42a61f6..3f72e2ea6c 100644 --- a/lighthouse/environment/tests/testnet_dir/config.yaml +++ b/lighthouse/environment/tests/testnet_dir/config.yaml @@ -87,9 +87,8 @@ DEPOSIT_CONTRACT_ADDRESS: 0x00000000219ab540356cBB839Cbe05303d7705Fa # Network # --------------------------------------------------------------- SUBNETS_PER_NODE: 2 -GOSSIP_MAX_SIZE: 10485760 +MAX_PAYLOAD_SIZE: 10485760 MIN_EPOCHS_FOR_BLOCK_REQUESTS: 33024 -MAX_CHUNK_SIZE: 10485760 TTFB_TIMEOUT: 5 RESP_TIMEOUT: 10 MESSAGE_DOMAIN_INVALID_SNAPPY: 0x00000000