diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1e775000b1..345a402bd9 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -956,13 +956,7 @@ impl BeaconChain { pub async fn get_block_and_blobs_checking_early_attester_cache( &self, block_root: &Hash256, - ) -> Result< - Option<( - Arc>, - Arc>, - )>, - Error, - > { + ) -> Result>, Error> { // If there is no data availability boundary, the Eip4844 fork is disabled. if let Some(finalized_data_availability_boundary) = self.finalized_data_availability_boundary() @@ -972,13 +966,19 @@ impl BeaconChain { self.early_attester_cache.get_block(*block_root), self.early_attester_cache.get_blobs(*block_root), ) { - Ok(Some((block, blobs))) + Ok(Some(SignedBeaconBlockAndBlobsSidecar { + beacon_block: block, + blobs_sidecar: blobs, + })) // Attempt to get the block and blobs from the database } else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) { let blobs = self .get_blobs(block_root, finalized_data_availability_boundary)? .map(Arc::new); - Ok(blobs.map(|blobs| (block, blobs))) + Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar { + beacon_block: block, + blobs_sidecar: blobs, + })) } else { Ok(None) } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 451c31668b..106722fd8d 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -14,8 +14,9 @@ use slog::{debug, error}; use slot_clock::SlotClock; use std::sync::Arc; use task_executor::TaskExecutor; +use tokio::count; use types::light_client_bootstrap::LightClientBootstrap; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlockAndBlobsSidecar, Slot}; +use types::{Epoch, EthSpec, Hash256, Slot}; use super::Worker; @@ -198,7 +199,7 @@ impl Worker { "Received BlocksByRoot Request"; "peer" => %peer_id, "requested" => request.block_roots.len(), - "returned" => %send_block_count + "returned" => send_block_count ); // send stream termination @@ -230,13 +231,10 @@ impl Worker { .get_block_and_blobs_checking_early_attester_cache(root) .await { - Ok(Some((block, blobs))) => { + Ok(Some(block_and_blobs)) => { self.send_response( peer_id, - Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar { - beacon_block: block, - blobs_sidecar: blobs, - })), + Response::BlobsByRoot(Some(block_and_blobs)), request_id, ); send_block_count += 1; @@ -253,7 +251,7 @@ impl Worker { error!( self.log, "No blobs in the store for block root"; - "request" => %request, + "request" => ?request, "peer" => %peer_id, "block_root" => ?root ); @@ -333,7 +331,7 @@ impl Worker { "Received BlobsByRoot Request"; "peer" => %peer_id, "requested" => request.block_roots.len(), - "returned" => %send_block_count + "returned" => send_block_count ); // send stream termination @@ -430,13 +428,18 @@ impl Worker { ) { debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, - "count" => %req.count, - "start_slot" => %req.start_slot, + "count" => req.count, + "start_slot" => req.start_slot, ); // Should not send more than max request blocks if req.count > MAX_REQUEST_BLOCKS { - req.count = MAX_REQUEST_BLOCKS; + return self.send_error_response( + peer_id, + RPCResponseErrorCode::InvalidRequest, + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(), + request_id, + ); } let forwards_block_root_iter = match self @@ -451,8 +454,8 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; - "requested_slot" => %slot, - "oldest_known_slot" => %oldest_block_slot + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot ); return self.send_error_response( peer_id, @@ -463,7 +466,7 @@ impl Worker { } Err(e) => { return error!(self.log, "Unable to obtain root iter"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "error" => ?e ) @@ -501,7 +504,7 @@ impl Worker { Ok(block_roots) => block_roots, Err(e) => { return error!(self.log, "Error during iteration over blocks"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "error" => ?e ) @@ -537,7 +540,7 @@ impl Worker { error!( self.log, "Block in the chain is not in the store"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "request_root" => ?root ); @@ -564,7 +567,7 @@ impl Worker { error!( self.log, "Error fetching block for peer"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "block_root" => ?root, "error" => ?e @@ -594,20 +597,20 @@ impl Worker { "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", - "start_slot" => %req.start_slot, - "current_slot" => %current_slot, - "requested" => %req.count, - "returned" => %blocks_sent + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent ); } else { debug!( self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, - "start_slot" => %req.start_slot, - "current_slot" => %current_slot, - "requested" => %req.count, - "returned" => %blocks_sent + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent ); } @@ -637,12 +640,20 @@ impl Worker { ) { debug!(self.log, "Received BlobsByRange Request"; "peer_id" => %peer_id, - "count" => %req.count, - "start_slot" => %req.start_slot, + "count" => req.count, + "start_slot" => req.start_slot, ); - let start_slot = Slot::from(req.start_slot); - let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); + // Should not send more than max request blocks + if req.count > MAX_REQUEST_BLOBS_SIDECARS { + return self.send_error_response( + peer_id, + RPCResponseErrorCode::InvalidRequest, + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(), + request_id, + ); + } + let data_availability_boundary = match self.chain.data_availability_boundary() { Some(boundary) => boundary, None => { @@ -657,31 +668,47 @@ impl Worker { } }; + let start_slot = Slot::from(req.start_slot); + let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); + + // If the peer requests data from beyond the data availability boundary we altruistically + // cap to the right time range. let serve_blobs_from_slot = if start_epoch < data_availability_boundary { + // Attempt to serve from the earliest block in our database, falling back to the data + // availability boundary let oldest_blob_slot = self .chain .store .get_blob_info() - .map(|blob_info| blob_info.oldest_blob_slot); + .map(|blob_info| blob_info.oldest_blob_slot) + .unwrap_or(data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch())); debug!( self.log, "Range request start slot is older than data availability boundary"; - "requested_slot" => %req.start_slot, + "requested_slot" => req.start_slot, "oldest_known_slot" => oldest_blob_slot, "data_availability_boundary" => data_availability_boundary ); - data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()) + // Check if the request is entirely out of the data availability period. The + // `oldest_blob_slot` is the oldest slot in the database, so includes a margin of error + // controlled by our prune margin. + let end_request_slot = start_slot + count; + if oldest_blob_slot < end_request_slot { + return self.send_error_response( + peer_id, + RPCResponseErrorCode::InvalidRequest, + "Request outside of data availability period".into(), + request_id, + ); + } + std::cmp::max(oldest_blob_slot, start_slot) } else { start_slot }; - // Should not send more than max request blocks - if req.count > MAX_REQUEST_BLOBS_SIDECARS { - req.count = MAX_REQUEST_BLOBS_SIDECARS; - } - + // If the peer requests data from beyond the data availability boundary we altruistically cap to the right time range let forwards_block_root_iter = match self.chain.forwards_iter_block_roots(serve_blobs_from_slot) { Ok(iter) => iter, @@ -692,8 +719,8 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; - "requested_slot" => %slot, - "oldest_known_slot" => %oldest_block_slot + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot ); return self.send_error_response( peer_id, @@ -704,7 +731,7 @@ impl Worker { } Err(e) => { return error!(self.log, "Unable to obtain root iter"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "error" => ?e ) @@ -742,7 +769,7 @@ impl Worker { Ok(block_roots) => block_roots, Err(e) => { return error!(self.log, "Error during iteration over blocks"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "error" => ?e ) @@ -769,7 +796,7 @@ impl Worker { error!( self.log, "No blobs or block in the store for block root"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "block_root" => ?root ); @@ -779,7 +806,7 @@ impl Worker { error!( self.log, "No blobs in the store for block root"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "block_root" => ?root ); @@ -796,7 +823,7 @@ impl Worker { error!( self.log, "Error fetching blinded block for block root"; - "request" => %req, + "request" => ?req, "peer" => %peer_id, "block_root" => ?root, "error" => ?e @@ -824,20 +851,20 @@ impl Worker { "BlobsByRange Response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blobs", - "start_slot" => %req.start_slot, - "current_slot" => %current_slot, - "requested" => %req.count, - "returned" => %blobs_sent + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent ); } else { debug!( self.log, "BlobsByRange Response processed"; "peer" => %peer_id, - "start_slot" => %req.start_slot, - "current_slot" => %current_slot, - "requested" => %req.count, - "returned" => %blobs_sent + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent ); }