From 22ed36bc6a58a768c0baaa7e80cf44647cea73ee Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 16 Dec 2022 15:16:17 -0500 Subject: [PATCH 1/5] fix is_empty check --- beacon_node/network/src/sync/network_context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 94801aa871..5a96e19245 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -50,7 +50,7 @@ impl BlockBlobRequestInfo { } pub fn pop_response(&mut self) -> Option> { - if !self.accumulated_blocks.is_empty() && !self.accumulated_blocks.is_empty() { + if !self.accumulated_blocks.is_empty() && !self.accumulated_sidecars.is_empty() { let beacon_block = self.accumulated_blocks.pop_front().expect("non empty"); let blobs_sidecar = self.accumulated_sidecars.pop_front().expect("non empty"); return Some(SignedBeaconBlockAndBlobsSidecar { From 3ab0f46077db6ea6c9424366273a45299401f70d Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 19 Dec 2022 12:27:31 -0500 Subject: [PATCH 2/5] Update beacon_node/http_api/src/publish_blocks.rs Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com> --- beacon_node/http_api/src/publish_blocks.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index c2b30d95f8..085f5036f4 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -32,7 +32,7 @@ pub async fn publish_block( let block_root = block_root.unwrap_or_else(|| block.canonical_root()); // Send the block, regardless of whether or not it is valid. The API - // specification is very clear that this isa the desired behaviour. + // specification is very clear that this is the desired behaviour. let wrapped_block = if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) { if let Some(sidecar) = chain.blob_cache.pop(&block_root) { let block_and_blobs = SignedBeaconBlockAndBlobsSidecar { From db29cb08a6c73a37f5296d5bceffc182be56d234 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 21 Dec 2022 00:18:05 +1100 Subject: [PATCH 3/5] Add bootnode binary variable in testnet scripts --- scripts/local_testnet/el_bootnode.sh | 2 +- scripts/local_testnet/vars.env | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/local_testnet/el_bootnode.sh b/scripts/local_testnet/el_bootnode.sh index 1f96bce2c2..b22812dd93 100755 --- a/scripts/local_testnet/el_bootnode.sh +++ b/scripts/local_testnet/el_bootnode.sh @@ -1,4 +1,4 @@ priv_key="02fd74636e96a8ffac8e7b01b0de8dea94d6bcf4989513b38cf59eb32163ff91" -/home/sean/CLionProjects/eip4844-interop/geth/go-ethereum/build/bin/bootnode --nodekeyhex $priv_key \ No newline at end of file +$BOOTNODE_BINARY --nodekeyhex $priv_key \ No newline at end of file diff --git a/scripts/local_testnet/vars.env b/scripts/local_testnet/vars.env index dbfc41e91f..9a7c22ea58 100644 --- a/scripts/local_testnet/vars.env +++ b/scripts/local_testnet/vars.env @@ -1,4 +1,5 @@ GETH_BINARY=geth +BOOTNODE_BINARY=bootnode # Base directories for the validator keys and secrets DATADIR=~/.lighthouse/local-testnet From c76e371559c1d057aac9b22cbb47e2689ef7af7e Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 21 Dec 2022 00:29:15 +1100 Subject: [PATCH 4/5] Add missing source --- scripts/local_testnet/el_bootnode.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/local_testnet/el_bootnode.sh b/scripts/local_testnet/el_bootnode.sh index b22812dd93..1b8834b890 100755 --- a/scripts/local_testnet/el_bootnode.sh +++ b/scripts/local_testnet/el_bootnode.sh @@ -1,4 +1,5 @@ priv_key="02fd74636e96a8ffac8e7b01b0de8dea94d6bcf4989513b38cf59eb32163ff91" +source ./vars.env $BOOTNODE_BINARY --nodekeyhex $priv_key \ No newline at end of file From 33d01a79119fcb85d48888c98dbf59b6e6c1cd3c Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 21 Dec 2022 15:50:51 -0500 Subject: [PATCH 5/5] miscelaneous fixes on syncing, rpc and responding to peer's sync related requests (#3827) - there was a bug in responding range blob requests where we would incorrectly label the first slot of an epoch as a non-skipped slot if it were skipped. this bug did not exist in the code for responding to block range request because the logic error was mitigated by defensive coding elsewhere - there was a bug where a block received during range sync without a corresponding blob (and vice versa) was incorrectly interpreted as a stream termination - RPC size limit fixes. - Our blob cache was dead locking so I removed use of it for now. - Because of our change in finalized sync batch size from 2 to 1 and our transition to using exact epoch boundaries for batches (rather than one slot past the epoch boundary), we need to sync finalized sync to 2 epochs + 1 slot past our peer's finalized slot in order to finalize the chain locally. - use fork context bytes in rpc methods on both the server and client side --- .../src/rpc/codec/ssz_snappy.rs | 7 +++-- .../lighthouse_network/src/rpc/protocol.rs | 26 ++++++++++++------- .../beacon_processor/worker/rpc_methods.rs | 26 ++++++++++++++++--- .../network/src/sync/network_context.rs | 25 ++++++++++++++++-- .../network/src/sync/range_sync/chain.rs | 8 +++++- beacon_node/store/src/hot_cold_store.rs | 6 ++--- 6 files changed, 78 insertions(+), 20 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index ce6e30ebf3..fb07e68313 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -298,8 +298,8 @@ impl Decoder for SSZSnappyOutboundCodec { .rpc_response_limits::(&self.fork_context); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { return Err(RPCError::InvalidData(format!( - "RPC response length is out of bounds, length {}", - length + "RPC response length is out of bounds, length {}, max {}, min {}", + length, ssz_limits.max, ssz_limits.min ))); } // Calculate worst case compression length for given uncompressed length @@ -439,6 +439,9 @@ fn context_bytes( SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()), }; } + if let RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) = rpc_variant { + return fork_context.to_context_bytes(ForkName::Eip4844); + } } } None diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 0773197e86..8a3149fc51 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -107,6 +107,12 @@ lazy_static! { .as_ssz_bytes() .len(); + pub static ref BLOBS_SIDECAR_MIN: usize = BlobsSidecar::::empty().as_ssz_bytes().len(); + pub static ref BLOBS_SIDECAR_MAX: usize = BlobsSidecar::::max_size(); + + //FIXME(sean) these are underestimates + pub static ref SIGNED_BLOCK_AND_BLOBS_MIN: usize = *BLOBS_SIDECAR_MIN + *SIGNED_BEACON_BLOCK_BASE_MIN; + pub static ref SIGNED_BLOCK_AND_BLOBS_MAX: usize =*BLOBS_SIDECAR_MAX + *SIGNED_BEACON_BLOCK_EIP4844_MAX; } /// The maximum bytes that can be sent across the RPC pre-merge. @@ -358,11 +364,10 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), - - //FIXME(sean) add blob sizes - Protocol::BlobsByRange => rpc_block_limits_by_fork(fork_context.current_fork()), - Protocol::BlobsByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), - + Protocol::BlobsByRange => RpcLimits::new(*BLOBS_SIDECAR_MIN, *BLOBS_SIDECAR_MAX), + Protocol::BlobsByRoot => { + RpcLimits::new(*SIGNED_BLOCK_AND_BLOBS_MIN, *SIGNED_BLOCK_AND_BLOBS_MAX) + } Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -381,13 +386,16 @@ impl ProtocolId { /// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the /// beginning of the stream, else returns `false`. pub fn has_context_bytes(&self) -> bool { - if self.version == Version::V2 { - match self.message_name { + match self.version { + Version::V2 => match self.message_name { Protocol::BlocksByRange | Protocol::BlocksByRoot => return true, _ => return false, - } + }, + Version::V1 => match self.message_name { + Protocol::BlobsByRange | Protocol::BlobsByRoot => return true, + _ => return false, + }, } - false } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 6eae7eed59..d85bb1f209 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -433,7 +433,17 @@ impl Worker { }; // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = None; + let mut last_block_root = req + .start_slot + .checked_sub(1) + .map(|prev_slot| { + self.chain + .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) + }) + .transpose() + .ok() + .flatten() + .flatten(); let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) // map skip slots to None @@ -602,7 +612,17 @@ impl Worker { }; // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = None; + let mut last_block_root = req + .start_slot + .checked_sub(1) + .map(|prev_slot| { + self.chain + .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) + }) + .transpose() + .ok() + .flatten() + .flatten(); let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) // map skip slots to None @@ -669,7 +689,7 @@ impl Worker { self.log, "BlobsByRange Response processed"; "peer" => %peer_id, - "msg" => "Failed to return all requested blocks", + "msg" => "Failed to return all requested blobs", "start_slot" => req.start_slot, "current_slot" => current_slot, "requested" => req.count, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 5917c7ecca..978bd69d06 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -314,14 +314,24 @@ impl SyncNetworkContext { let (chain_id, batch_id, info) = entry.get_mut(); let chain_id = chain_id.clone(); let batch_id = batch_id.clone(); + let stream_terminator = maybe_block.is_none(); info.add_block_response(maybe_block); - let maybe_block = info.pop_response().map(|block_sidecar_pair| { + let maybe_block_wrapped = info.pop_response().map(|block_sidecar_pair| { BlockWrapper::BlockAndBlob { block_sidecar_pair } }); + + if stream_terminator && !info.is_finished() { + return None; + } + if !stream_terminator && maybe_block_wrapped.is_none() { + return None; + } + if info.is_finished() { entry.remove(); } - Some((chain_id, batch_id, maybe_block)) + + Some((chain_id, batch_id, maybe_block_wrapped)) } Entry::Vacant(_) => None, } @@ -356,13 +366,24 @@ impl SyncNetworkContext { let (chain_id, batch_id, info) = entry.get_mut(); let chain_id = chain_id.clone(); let batch_id = batch_id.clone(); + let stream_terminator = maybe_sidecar.is_none(); info.add_sidecar_response(maybe_sidecar); let maybe_block = info .pop_response() .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair }); + + if stream_terminator && !info.is_finished() { + return None; + } + + if !stream_terminator && maybe_block.is_none() { + return None; + } + if info.is_finished() { entry.remove(); } + Some((chain_id, batch_id, maybe_block)) } Entry::Vacant(_) => None, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 199be788e7..46b6d05d7d 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -137,10 +137,16 @@ impl SyncingChain { let id = SyncingChain::::id(&target_head_root, &target_head_slot); + let target_slot = if is_finalized_segment { + target_head_slot + (2 * T::EthSpec::slots_per_epoch()) + 1 + } else { + target_head_slot + }; + SyncingChain { id, start_epoch, - target_head_slot, + target_head_slot: target_slot, target_head_root, batches: BTreeMap::new(), peers, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 732795fce2..00aa0b2af1 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -503,9 +503,9 @@ impl, Cold: ItemStore> HotColdDB } pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { - if let Some(blobs) = self.blob_cache.lock().get(block_root) { - Ok(Some(blobs.clone())) - } else if let Some(bytes) = self + // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, + // may want to attempt to use one again + if let Some(bytes) = self .hot_db .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {