diff --git a/Cargo.lock b/Cargo.lock index 9a0b97d2e0..b9b1a12ad1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2157,6 +2157,15 @@ dependencies = [ "types", ] +[[package]] +name = "erased-serde" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2b0c2380453a92ea8b6c8e5f64ecaafccddde8ceab55ff7a8ac1029f894569" +dependencies = [ + "serde", +] + [[package]] name = "errno" version = "0.3.0" @@ -7447,6 +7456,9 @@ name = "slog" version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8347046d4ebd943127157b94d63abb990fcf729dc4e9978927fdf4ac3c998d06" +dependencies = [ + "erased-serde", +] [[package]] name = "slog-async" diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 99ecdaa4c9..84fa937f90 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -137,6 +137,40 @@ impl DataAvailabilityChecker { } } + pub fn has_block(&self, block_root: &Hash256) -> bool { + self.availability_cache + .read() + .get(block_root) + .map_or(false, |cache| cache.executed_block.is_some()) + } + + pub fn get_missing_blob_ids(&self, block_root: &Hash256) -> Vec { + let epoch = self.slot_clock.now().map(|s| s.epoch(T::slots_per_epoch())); + if epoch.map_or(false, |e| self.da_check_required(e)) { + self.availability_cache + .read() + .get(block_root) + .map_or(vec![], |cache| { + if let Some(block) = cache.executed_block.as_ref() { + block.get_filtered_blob_ids(|i, blob_root| { + cache.verified_blobs.get(i).is_none() + }) + } else { + let mut blob_ids = Vec::with_capacity(T::max_blobs_per_block()); + for i in 0..T::max_blobs_per_block() { + blob_ids.push(BlobIdentifier { + block_root: *block_root, + index: i as u64, + }); + } + blob_ids + } + }) + } else { + vec![] + } + } + pub fn wrap_block( &self, block_root: Hash256, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index da20d44f0b..f830b194ec 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -12,6 +12,7 @@ use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; use std::sync::Arc; use std::time::Duration; +use store::FixedVector; use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ @@ -77,8 +78,11 @@ pub async fn publish_block( PubsubMessage::BlobSidecar(Box::new((blob_index as u64, blob))), )?; } - let blobs = signed_blobs.into_iter().map(|blob| blob.message).collect(); - BlockWrapper::BlockAndBlobs(block, blobs) + let blobs = signed_blobs + .into_iter() + .map(|blob| Some(blob.message)) + .collect::>(); + BlockWrapper::BlockAndBlobs(block, FixedVector::from(blobs)) } else { block.into() } diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index d068a20079..9025ee6d40 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -19,7 +19,7 @@ store = { path = "../store" } lighthouse_network = { path = "../lighthouse_network" } types = { path = "../../consensus/types" } slot_clock = { path = "../../common/slot_clock" } -slog = { version = "2.5.2", features = ["max_level_trace"] } +slog = { version = "2.5.2", features = ["max_level_trace", "nested-values"] } hex = "0.4.2" eth2_ssz = "0.4.1" eth2_ssz_types = "0.2.2" diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 3cdb791083..4551930c63 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -911,6 +911,7 @@ impl Worker { ); return None; } + _ => todo!(), //TODO(sean) }; metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index cba272a35d..b402153e59 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -19,7 +19,7 @@ use slog::{debug, error, info, warn}; use ssz_types::FixedVector; use std::sync::Arc; use tokio::sync::mpsc; -use types::{BlobSidecar, Epoch, Hash256, SignedBeaconBlock}; +use types::{BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -90,8 +90,6 @@ impl Worker { let slot = block.slot(); let parent_root = block.message().parent_root(); - // TODO(sean) check availability here and send information to sync? - let result = self .chain .process_block( @@ -151,9 +149,20 @@ impl Worker { seen_timestamp: Duration, process_type: BlockProcessType, ) { + let Some(slot) = blobs.iter().find_map(|blob|{ + if let Some(blob) = blob { + Some(blob.slot) + } else { + None + } + }) else { + return; + }; + let result = self .chain .check_availability_and_maybe_import( + slot, |chain| { chain .data_availability_checker @@ -167,7 +176,7 @@ impl Worker { self.send_sync_message(SyncMessage::BlockPartProcessed { process_type, result: result.into(), - response_type: ResponseType::Blobs, + response_type: ResponseType::Blob, }); } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 07e2039f98..b7028b3d8b 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -24,6 +24,7 @@ use super::{ }; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; +use crate::sync::block_lookups::single_block_lookup::LookupVerifyError; mod parent_lookup; mod single_block_lookup; @@ -121,14 +122,19 @@ impl BlockLookups { .single_block_lookups .iter_mut() .any(|(block_id, blob_id, single_block_request)| { - single_block_request.add_peer(&hash, &peer_id) + if single_block_request.requested_block_root == hash { + single_block_request.block_request_state.add_peer(&peer_id); + single_block_request.blob_request_state.add_peer(&peer_id); + return true; + } + false }) { return; } if self.parent_lookups.iter_mut().any(|parent_req| { - parent_req.add_block_peer(&hash, &peer_id) || parent_req.contains_block(&hash) + parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash) }) { // If the block was already downloaded, or is being downloaded in this moment, do not // request it. @@ -187,7 +193,7 @@ impl BlockLookups { ) { self.search_block_with( |request| { - let _ = request.add_block_wrapper(block_root, block); + let _ = request.add_block_wrapper(block_root, block.clone()); }, block_root, peer_id, @@ -215,8 +221,7 @@ impl BlockLookups { // Make sure this block is not already downloaded, and that neither it or its parent is // being searched for. if self.parent_lookups.iter_mut().any(|parent_req| { - parent_req.contains_block(&block_root) - || parent_req.add_block_peer(&block_root, &peer_id) + parent_req.contains_block(&block_root) || parent_req.add_peer(&block_root, &peer_id) }) { // we are already searching for this block, ignore it return; @@ -247,62 +252,57 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { let stream_terminator = block.is_none().into(); + let log = self.log.clone(); - let Some((request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else { + let Some((triggered_parent_request, request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else { return; }; - if let Err(error) = request_ref.verify_block(block).and_then(|root_block_opt| { - if let Some((root, block)) = root_block_opt { - // Only send for processing if we don't have parent requests that were triggered by - // this block. - let triggered_parent_request = self - .parent_lookups - .iter() - .any(|lookup| lookup.chain_hash() == root); - + let should_remove = match request_ref.verify_block(block) { + Ok(Some((root, block))) => { if triggered_parent_request { // The lookup status here is irrelevant because we wait until the parent chain // is complete before processing the block. - let _ = request_ref.add_block(root, block)?; + if let Err(e) = request_ref.add_block(root, block) { + Self::handle_block_lookup_verify_error( + id, + peer_id, + cx, + request_id_ref, + request_ref, + e, + &log, + ) + } else { + false + } } else { // This is the correct block, send it for processing - if self - .send_block_for_processing( - root, - BlockWrapper::Block(block), - seen_timestamp, - BlockProcessType::SingleBlock { id }, - cx, - ) - .is_err() - { - // Remove to avoid inconsistencies - self.single_block_lookups - .retain(|(block_id, _, _)| block_id != &Some(id)); - } + self.send_block_for_processing( + root, + BlockWrapper::Block(block), + seen_timestamp, + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() } } + Ok(None) => false, + Err(e) => Self::handle_blob_lookup_verify_error( + id, + peer_id, + cx, + request_id_ref, + request_ref, + e, + &log, + ), + }; - Ok(()) - }) { - let msg: &str = error.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); - - debug!(self.log, "Single block lookup failed"; - "peer_id" => %peer_id, "error" => msg, "block_root" => %request_ref.requested_block_root); - // try the request again if possible - if let Ok((peer_id, request)) = request_ref.request_block() { - if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { - *request_id_ref = id; - } else { - self.single_block_lookups - .retain(|(block_id, _, _)| block_id != &Some(id)); - } - } else { - self.single_block_lookups - .retain(|(block_id, _, _)| block_id != &Some(id)); - } + if should_remove { + self.single_block_lookups + .retain(|(block_id, _, _)| block_id != &Some(id)); } metrics::set_gauge( @@ -321,60 +321,58 @@ impl BlockLookups { ) { let stream_terminator = blob.is_none().into(); - let Some((request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Blob) else { + let log = self.log.clone(); + + let Some((triggered_parent_request, request_id_ref, request_ref)) = + self.find_single_lookup_request(id, stream_terminator, ResponseType::Blob) else { return; }; - if let Err(error) = request_ref.verify_blob(blob).and_then(|root_blobs_opt| { - if let Some((block_root, blobs)) = root_blobs_opt { - // Only send for processing if we don't have parent requests that were triggered by - // this block. - let triggered_parent_request = self - .parent_lookups - .iter() - .any(|lookup| lookup.chain_hash() == block_root); - + let should_remove = match request_ref.verify_blob(blob) { + Ok(Some((block_root, blobs))) => { if triggered_parent_request { // The lookup status here is irrelevant because we wait until the parent chain // is complete before processing the block. - let _ = request_ref.add_blobs(block_root, blobs)?; + if let Err(e) = request_ref.add_blobs(block_root, blobs) { + Self::handle_blob_lookup_verify_error( + id, + peer_id, + cx, + request_id_ref, + request_ref, + e, + &log, + ) + } else { + false + } } else { // These are the correct blobs, send them for processing - if self - .send_blobs_for_processing( - block_root, - blobs, - seen_timestamp, - BlockProcessType::SingleBlock { id }, - cx, - ) - .is_err() - { - // Remove to avoid inconsistencies - self.single_block_lookups - .retain(|(_, blob_id, _)| blob_id != &Some(id)); - } + self.send_blobs_for_processing( + block_root, + blobs, + seen_timestamp, + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() } } - Ok(()) - }) { - let msg: &str = error.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + Ok(None) => false, + Err(e) => Self::handle_blob_lookup_verify_error( + id, + peer_id, + cx, + request_id_ref, + request_ref, + e, + &log, + ), + }; - debug!(self.log, "Single block lookup failed"; - "peer_id" => %peer_id, "error" => msg, "block_root" => %request_ref.requested_block_root); - // try the request again if possible - if let Ok((peer_id, request)) = request_ref.request_blobs() { - if let Ok(id) = cx.single_blobs_lookup_request(peer_id, request) { - *request_id_ref = id; - } else { - self.single_block_lookups - .retain(|(_, blob_id, _)| blob_id != &Some(id)); - } - } else { - self.single_block_lookups - .retain(|(_, blob_id, _)| blob_id != &Some(id)); - } + if should_remove { + self.single_block_lookups + .retain(|(_, blob_id, _)| blob_id != &Some(id)); } metrics::set_gauge( @@ -383,16 +381,89 @@ impl BlockLookups { ); } + //TODO(sean) reduce duplicate code + fn handle_block_lookup_verify_error( + id: Id, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + request_id_ref: &mut Id, + request_ref: &mut SingleBlockLookup<3, T>, + error: LookupVerifyError, + log: &Logger, + ) -> bool { + let requested_block_root = request_ref.requested_block_root; + + let msg: &str = error.into(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + + debug!(log, "Single block lookup failed"; + "peer_id" => %peer_id, "error" => msg, "block_root" => ?requested_block_root); + // try the request again if possible + match request_ref.request_block() { + Ok(Some((peer_id, request))) => { + if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { + *request_id_ref = id; + } else { + return true; + } + } + Ok(None) => {} + Err(e) => { + debug!(log, "Single block lookup failed"; + "peer_id" => %peer_id, "error" => ?e, "block_root" => %requested_block_root); + return true; + } + } + false + } + + fn handle_blob_lookup_verify_error( + id: Id, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + request_id_ref: &mut Id, + request_ref: &mut SingleBlockLookup<3, T>, + error: LookupVerifyError, + log: &Logger, + ) -> bool { + let requested_block_root = request_ref.requested_block_root; + + let msg: &str = error.into(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + + debug!(log, "Single block lookup failed"; + "peer_id" => %peer_id, "error" => msg, "block_root" => ?requested_block_root); + // try the request again if possible + match request_ref.request_blobs() { + Ok(Some((peer_id, request))) => { + if let Ok(id) = cx.single_blobs_lookup_request(peer_id, request) { + *request_id_ref = id; + } else { + return true; + } + } + Ok(None) => {} + Err(e) => { + debug!(log, "Single block lookup failed"; + "peer_id" => %peer_id, "error" => ?e, "block_root" => %requested_block_root); + return true; + } + } + false + } + fn find_single_lookup_request( &mut self, target_id: Id, stream_terminator: StreamTerminator, response_type: ResponseType, ) -> Option<( + bool, &mut Id, &mut SingleBlockLookup, )> { let lookup: Option<( + bool, &mut Id, &mut SingleBlockLookup, )> = self @@ -404,27 +475,36 @@ impl BlockLookups { ResponseType::Blob => blob_id_opt, }; if let Some(lookup_id) = id_opt { - if lookup_id == target_id { - Some((lookup_id, req)) + if *lookup_id == target_id { + // Only send for processing if we don't have parent requests that were triggered by + // this block. + let triggered_parent_request = self + .parent_lookups + .iter() + .any(|lookup| lookup.chain_hash() == req.requested_block_root); + + return Some((triggered_parent_request, lookup_id, req)); } } None }); - let (id_ref, request) = match lookup { - Some((id_ref, req)) => (id_ref, req), + let (triggered_parent_request, id_ref, request) = match lookup { + Some((triggered_parent_request, id_ref, req)) => { + (triggered_parent_request, id_ref, req) + } None => { if matches!(StreamTerminator::False, stream_terminator) { debug!( self.log, "Block returned for single block lookup not present"; - "response_type" => response_type, + "response_type" => ?response_type, ); } return None; } }; - Some((id_ref, request)) + Some((triggered_parent_request, id_ref, request)) } /// Process a response received from a parent lookup request. @@ -451,7 +531,7 @@ impl BlockLookups { match parent_lookup.verify_block(block, &mut self.failed_chains) { Ok(Some((block_root, block))) => { - let process_or_search = parent_lookup.add_block(block_root, block); + let process_or_search = parent_lookup.add_block(block_root, block).unwrap(); //TODO(sean) fix match process_or_search { LookupDownloadStatus::Process(wrapper) => { let chain_hash = parent_lookup.chain_hash(); @@ -486,7 +566,7 @@ impl BlockLookups { | ParentVerifyError::UnrequestedBlobId | ParentVerifyError::ExtraBlobsReturned | ParentVerifyError::InvalidIndex(_) - | ParentVerifyError::AvailabilityCheck(_) => { + | ParentVerifyError::AvailabilityCheck => { let e = e.into(); warn!(self.log, "Peer sent invalid response to parent request."; "peer_id" => %peer_id, "reason" => %e); @@ -545,7 +625,7 @@ impl BlockLookups { match parent_lookup.verify_blob(blob, &mut self.failed_chains) { Ok(Some((block_root, blobs))) => { - let processed_or_search = parent_lookup.add_blobs(block_root, blobs); + let processed_or_search = parent_lookup.add_blobs(block_root, blobs).unwrap(); //TODO(sean) fix match processed_or_search { LookupDownloadStatus::Process(wrapper) => { let chain_hash = parent_lookup.chain_hash(); @@ -573,14 +653,14 @@ impl BlockLookups { // processing result arrives. self.parent_lookups.push(parent_lookup); } - Err(e) => match e.into() { + Err(e) => match e { ParentVerifyError::RootMismatch | ParentVerifyError::NoBlockReturned | ParentVerifyError::ExtraBlocksReturned | ParentVerifyError::UnrequestedBlobId | ParentVerifyError::ExtraBlobsReturned | ParentVerifyError::InvalidIndex(_) - | ParentVerifyError::AvailabilityCheck(_) => { + | ParentVerifyError::AvailabilityCheck => { let e = e.into(); warn!(self.log, "Peer sent invalid response to parent request."; "peer_id" => %peer_id, "reason" => %e); @@ -1127,25 +1207,35 @@ impl BlockLookups { .enumerate() .find(|(index, (_, _, req))| req.requested_block_root == chain_hash) { - self.single_block_lookups + if let Some((block_id, blob_id, block_wrapper)) = self + .single_block_lookups .get_mut(index) - .and_then(|(_, _, lookup)| lookup.get_downloaded_block()) - .map(|block_wrapper| { - // This is the correct block, send it for processing - if self - .send_block_for_processing( - chain_hash, - block_wrapper, - Duration::from_secs(0), //TODO(sean) pipe this through - BlockProcessType::SingleBlock { id }, - cx, - ) - .is_err() - { - // Remove to avoid inconsistencies - self.single_block_lookups.remove(index); - } - }); + .and_then(|(block_id, blob_id, lookup)| { + lookup + .get_downloaded_block() + .map(|block| (block_id, blob_id, block)) + }) + { + let Some(id) = block_id.or(*blob_id) else { + warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); + return; + }; + + // This is the correct block, send it for processing + if self + .send_block_for_processing( + chain_hash, + block_wrapper, + Duration::from_secs(0), //TODO(sean) pipe this through + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups.remove(index); + } + } } } BatchProcessResult::FaultyFailure { diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 23ce722bc1..2b095d58d6 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -5,8 +5,8 @@ use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, network_context::SyncNetworkContext, }; +use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; -use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::BeaconChainTypes; @@ -48,7 +48,7 @@ pub enum ParentVerifyError { ExtraBlobsReturned, InvalidIndex(u64), PreviousFailure { parent_root: Hash256 }, - AvailabilityCheck(AvailabilityCheckError), + AvailabilityCheck, //TODO(sean) wrap the underlying error } #[derive(Debug, PartialEq, Eq)] @@ -107,11 +107,11 @@ impl ParentLookup { match cx.parent_lookup_block_request(peer_id, request) { Ok(request_id) => { self.current_parent_request_id = Some(request_id); - Ok(()) + return Ok(()); } Err(reason) => { self.current_parent_request_id = None; - Err(RequestError::SendFailed(reason)) + return Err(RequestError::SendFailed(reason)); } } } @@ -131,11 +131,11 @@ impl ParentLookup { match cx.parent_lookup_blobs_request(peer_id, request) { Ok(request_id) => { self.current_parent_blob_request_id = Some(request_id); - Ok(()) + return Ok(()); } Err(reason) => { self.current_parent_blob_request_id = None; - Err(RequestError::SendFailed(reason)) + return Err(RequestError::SendFailed(reason)); } } } @@ -161,15 +161,6 @@ impl ParentLookup { self.downloaded_blocks.push((current_root, block)); self.current_parent_request.requested_block_root = next_parent; - let mut blob_ids = Vec::with_capacity(T::EthSpec::max_blobs_per_block()); - for i in 0..T::EthSpec::max_blobs_per_block() { - blob_ids.push(BlobIdentifier { - block_root: current_root, - index: i as u64, - }); - } - - self.current_parent_request.requested_ids = blob_ids; self.current_parent_request.block_request_state.state = single_block_lookup::State::AwaitingDownload; self.current_parent_request.blob_request_state.state = @@ -312,15 +303,14 @@ impl ParentLookup { )>, ParentVerifyError, > { - let block_root = self.current_parent_request.requested_block_root; let blobs = self.current_parent_request.verify_blob(blob)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should // be dropped and the peer downscored. if let Some(parent_root) = blobs .as_ref() - .and_then(|blobs| blobs.first()) - .map(|blob| blob.block_parent_root_id) + .and_then(|(_, blobs)| blobs.first()) + .and_then(|blob| blob.as_ref().map(|b| b.block_parent_root)) { if failed_chains.contains(&parent_root) { self.current_parent_request @@ -331,7 +321,7 @@ impl ParentLookup { } } - Ok((block_root, blobs)) + Ok(blobs) } pub fn get_block_processing_peer(&self, chain_hash: Hash256) -> Option { @@ -361,28 +351,26 @@ impl ParentLookup { self.current_parent_request.failed_attempts() } - pub fn add_block_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool { - self.current_parent_request.add_peer(block_root, peer_id) + //TODO(sean) fix this up + pub fn add_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool { + if block_root == &self.chain_hash { + return false; + } + self.current_parent_request + .block_request_state + .add_peer(peer_id); + self.current_parent_request + .blob_request_state + .add_peer(peer_id); + true } + //TODO(sean) fix this up pub fn used_block_peers(&self) -> impl Iterator + '_ { - self.current_parent_request.used_peers.iter() - } - - #[cfg(test)] - pub fn failed_blob_attempts(&self) -> u8 { - self.current_parent_blob_request - .map_or(0, |req| req.failed_attempts()) - } - - pub fn add_blobs_peer(&mut self, blobs: &[BlobIdentifier], peer_id: &PeerId) -> bool { - self.current_parent_blob_request - .map_or(false, |mut req| req.add_peer(blobs, peer_id)) - } - - pub fn used_blob_peers(&self) -> impl Iterator + '_ { - self.current_parent_blob_request - .map_or(iter::empty(), |req| req.used_peers.iter()) + self.current_parent_request + .block_request_state + .used_peers + .iter() } } @@ -396,7 +384,7 @@ impl From for ParentVerifyError { E::UnrequestedBlobId => ParentVerifyError::UnrequestedBlobId, E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned, E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index), - E::AvailabilityCheck(e) => ParentVerifyError::AvailabilityCheck(e), + E::AvailabilityCheck => ParentVerifyError::AvailabilityCheck, } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 2cbaeb5ffb..caef901191 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -30,7 +30,7 @@ pub struct SingleBlockLookup { /// Object representing a single block lookup request. /// //previously assumed we would have a single block. Now we may have the block but not the blobs -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Debug)] pub struct SingleLookupRequestState { /// State of this request. pub state: State, @@ -59,7 +59,7 @@ pub enum LookupVerifyError { UnrequestedBlobId, ExtraBlobsReturned, InvalidIndex(u64), - AvailabilityCheck(AvailabilityCheckError), + AvailabilityCheck, //TODO(sean) wrap the underlying error } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -112,17 +112,17 @@ impl SingleBlockLookup Ok(LookupDownloadStatus::Process(wrapper)), Err(AvailabilityCheckError::MissingBlobs) => { Ok(LookupDownloadStatus::SearchBlock(block_root)) } - Err(e) => Err(LookupVerifyError::AvailabilityCheck(e)), + Err(_e) => Err(LookupVerifyError::AvailabilityCheck), } } else { Ok(LookupDownloadStatus::SearchBlock(block_root)) @@ -135,17 +135,17 @@ impl SingleBlockLookup>, ) -> Result, LookupVerifyError> { //TODO(sean) check for existing block? - self.downloaded_block = Some(block); + self.downloaded_block = Some(block.clone()); match self .da_checker - .wrap_block(block_root, block, self.downloaded_blobs) + .wrap_block(block_root, block, self.downloaded_blobs.clone()) { Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)), Err(AvailabilityCheckError::MissingBlobs) => { Ok(LookupDownloadStatus::SearchBlock(block_root)) } - Err(e) => LookupVerifyError::AvailabilityCheck(e), + Err(_e) => Err(LookupVerifyError::AvailabilityCheck), } } @@ -193,7 +193,7 @@ impl SingleBlockLookup { - self.register_failure_downloading(); + self.block_request_state.register_failure_downloading(); Err(LookupVerifyError::NoBlockReturned) } }, @@ -216,12 +216,13 @@ impl SingleBlockLookup>>, ) -> Result< - Option< + Option<( + Hash256, FixedVector< Option>>, <::EthSpec as EthSpec>::MaxBlobsPerBlock, >, - >, + )>, LookupVerifyError, > { match self.block_request_state.state { @@ -237,9 +238,13 @@ impl SingleBlockLookup SingleBlockLookup { self.blob_request_state.state = State::Processing { peer_id }; - Ok(Some(self.downloaded_blobs.clone())) + Ok(Some(( + self.requested_block_root, + self.downloaded_blobs.clone(), + ))) } }, State::Processing { peer_id: _ } => match blob { @@ -268,7 +276,8 @@ impl SingleBlockLookup Result, LookupRequestError> { - if self.da_checker.has_block(self.requested_block_root) || self.downloaded_block.is_some() { + if self.da_checker.has_block(&self.requested_block_root) || self.downloaded_block.is_some() + { return Ok(None); } @@ -276,7 +285,7 @@ impl SingleBlockLookup= MAX_ATTEMPTS { + if self.block_request_state.failed_attempts() >= MAX_ATTEMPTS { Err(LookupRequestError::TooManyAttempts { cannot_process: self.block_request_state.failed_processing >= self.block_request_state.failed_downloading, @@ -301,15 +310,18 @@ impl SingleBlockLookup Result, LookupRequestError> { - if self.da_checker.has_block(self.requested_block_root) || self.downloaded_block.is_some() { + let missing_ids = self + .da_checker + .get_missing_blob_ids(&self.requested_block_root); + if missing_ids.is_empty() || self.downloaded_block.is_some() { return Ok(None); } debug_assert!(matches!( - self.block_request_state.state, + self.blob_request_state.state, State::AwaitingDownload )); - if self.failed_attempts() >= MAX_ATTEMPTS { + if self.blob_request_state.failed_attempts() >= MAX_ATTEMPTS { Err(LookupRequestError::TooManyAttempts { cannot_process: self.blob_request_state.failed_processing >= self.blob_request_state.failed_downloading, @@ -321,7 +333,7 @@ impl SingleBlockLookup slog::Value serializer: &mut dyn slog::Serializer, ) -> slog::Result { serializer.emit_str("request", key)?; - serializer.emit_arguments("hash", &format_args!("{}", self.requested_thing))?; + serializer.emit_arguments("hash", &format_args!("{}", self.requested_block_root))?; + serializer.emit_arguments("blob_ids", &format_args!("{:?}", self.requested_ids))?; + serializer.emit_arguments( + "block_request_state", + &format_args!("{:?}", self.block_request_state), + )?; + serializer.emit_arguments( + "blob_request_state", + &format_args!("{:?}", self.blob_request_state), + )?; + slog::Result::Ok(()) + } +} + +impl slog::Value for SingleLookupRequestState { + fn serialize( + &self, + record: &slog::Record, + key: slog::Key, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + serializer.emit_str("request_state", key)?; match &self.state { State::AwaitingDownload => { "awaiting_download".serialize(record, "state", serializer)? diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 25b20aa37f..9a7250cfbd 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -60,7 +60,7 @@ impl BlocksAndBlobsRequestInfo { for blob in blob_list { let blob_index = blob.index as usize; if blob_index >= T::max_blobs_per_block() { - return Err(format!("Invalid blob index {blob_index:?}").as_str()); + return Err("Invalid blob index"); } blobs_fixed.insert(blob_index, Some(blob)); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 22820b496d..8e77998cce 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -255,6 +255,8 @@ pub fn spawn( log: log.clone(), }; + let log_clone = log.clone(); + let sync_send_clone = sync_send.clone(); executor.spawn( async move { let slot_duration = beacon_chain.slot_clock.slot_duration(); @@ -299,9 +301,9 @@ pub fn spawn( ); // spawn the sync manager thread - debug!(log, "Sync Manager started"); + debug!(log_clone, "Sync Manager started"); executor.spawn(async move { Box::pin(sync_manager.main()).await }, "sync"); - sync_send + sync_send_clone } impl SyncManager { @@ -660,7 +662,9 @@ impl SyncManager { } }; - if block.slot() == slot { + let block_slot = block.slot(); + + if block_slot == slot { if let Err(e) = self .delayed_lookups .try_send(SyncMessage::UnknownBlock(peer_id, block, block_root)) @@ -676,7 +680,7 @@ impl SyncManager { ); } self.block_lookups.search_parent( - block.slot(), + block_slot, block_root, parent_root, peer_id, diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 03b767a17b..b89f3e50e0 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -105,7 +105,7 @@ pub trait EthSpec: /* * New in Deneb */ - type MaxBlobsPerBlock: Unsigned + Clone + Sync + Send + Debug + PartialEq; + type MaxBlobsPerBlock: Unsigned + Clone + Sync + Send + Debug + PartialEq + Unpin; type FieldElementsPerBlob: Unsigned + Clone + Sync + Send + Debug + PartialEq; type BytesPerFieldElement: Unsigned + Clone + Sync + Send + Debug + PartialEq; /*