Reprocess an ExecutedBlock on unavailable blobs

This commit is contained in:
Pawan Dhananjay
2023-03-17 21:30:14 +05:30
parent 05db0d2ba3
commit acd36ccaa6
8 changed files with 320 additions and 33 deletions

View File

@@ -41,7 +41,7 @@ num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" }
if-addrs = "0.6.4"
strum = "0.24.0"
tokio-util = { version = "0.6.3", features = ["time"] }
tokio-util = { version = "0.7.7", features = ["time"] }
derivative = "2.2.0"
delay_map = "0.1.1"
ethereum-types = { version = "0.14.1", optional = true }

View File

@@ -42,7 +42,9 @@ use crate::sync::manager::BlockProcessType;
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::parking_lot::Mutex;
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer};
use beacon_chain::{
BeaconChain, BeaconChainTypes, ExecutedBlock, GossipVerifiedBlock, NotifyExecutionLayer,
};
use derivative::Derivative;
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
@@ -83,6 +85,8 @@ mod worker;
use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock;
pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage};
use self::work_reprocessing_queue::QueuedExecutedBlock;
/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
/// Setting this too low will cause consensus messages to be dropped.
@@ -219,6 +223,7 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch";
pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate";
pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch";
pub const GOSSIP_BLOCK: &str = "gossip_block";
pub const EXECUTED_BLOCK: &str = "executed_block";
pub const GOSSIP_BLOCK_AND_BLOBS_SIDECAR: &str = "gossip_block_and_blobs_sidecar";
pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
@@ -729,7 +734,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
fn from(ready_work: ReadyWork<T>) -> Self {
match ready_work {
ReadyWork::Block(QueuedGossipBlock {
ReadyWork::GossipBlock(QueuedGossipBlock {
peer_id,
block,
seen_timestamp,
@@ -741,6 +746,18 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
seen_timestamp,
},
},
ReadyWork::ExecutedBlock(QueuedExecutedBlock {
peer_id,
block,
seen_timestamp,
}) => Self {
drop_during_sync: false,
work: Work::ExecutedBlock {
peer_id,
block,
seen_timestamp,
},
},
ReadyWork::RpcBlock(QueuedRpcBlock {
block_root,
block,
@@ -872,6 +889,11 @@ pub enum Work<T: BeaconChainTypes> {
block: Box<GossipVerifiedBlock<T>>,
seen_timestamp: Duration,
},
ExecutedBlock {
peer_id: PeerId,
block: ExecutedBlock<T::EthSpec>,
seen_timestamp: Duration,
},
GossipVoluntaryExit {
message_id: MessageId,
peer_id: PeerId,
@@ -968,6 +990,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::GossipAggregate { .. } => GOSSIP_AGGREGATE,
Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH,
Work::GossipBlock { .. } => GOSSIP_BLOCK,
Work::ExecutedBlock { .. } => EXECUTED_BLOCK,
Work::GossipSignedBlobSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR,
Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK,
Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT,
@@ -1127,6 +1150,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
// Using a FIFO queue since blocks need to be imported sequentially.
let mut executed_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
@@ -1243,6 +1267,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// on the delayed ones.
} else if let Some(item) = delayed_block_queue.pop() {
self.spawn_worker(item, toolbox);
// Check availability pending blocks
} else if let Some(item) = executed_block_queue.pop() {
self.spawn_worker(item, toolbox);
// Check gossip blocks before gossip attestations, since a block might be
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
@@ -1462,6 +1489,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::GossipBlock { .. } => {
gossip_block_queue.push(work, work_id, &self.log)
}
Work::ExecutedBlock { .. } => {
gossip_block_queue.push(work, work_id, &self.log)
}
Work::GossipSignedBlobSidecar { .. } => {
gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log)
}
@@ -1742,6 +1772,20 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
)
.await
}),
Work::ExecutedBlock {
peer_id,
block,
seen_timestamp,
} => task_spawner.spawn_async(async move {
worker
.process_execution_verified_block(
peer_id,
block,
work_reprocessing_tx,
seen_timestamp,
)
.await
}),
/*
* Verification for blobs sidecars received on gossip.
*/

View File

@@ -14,7 +14,9 @@ use super::MAX_SCHEDULED_WORK_QUEUE_LEN;
use crate::metrics;
use crate::sync::manager::BlockProcessType;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use beacon_chain::{
BeaconChainTypes, ExecutedBlock, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
use fnv::FnvHashMap;
use futures::task::Poll;
use futures::{Stream, StreamExt};
@@ -53,11 +55,19 @@ pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue rpc blocks before sending them back for reprocessing.
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
/// For how long to queue executed blocks before sending them back for reprocessing.
pub const QUEUED_EXECUTED_BLOCK_DELAY: Duration = Duration::from_secs(12);
/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
const MAXIMUM_QUEUED_BLOCKS: usize = 16;
/// An `ExecutedBlock` contains the entire `BeaconState`, so we shouldn't be storing too many of them
/// to avoid getting DoS'd by the block proposer.
/// TODO(pawan): revise the max blocks
const MAXIMUM_QUEUED_EXECUTED_BLOCKS: usize = 4;
/// How many attestations we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
@@ -77,6 +87,9 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
block_root: Hash256,
parent_root: Hash256,
},
ExecutedBlock(QueuedExecutedBlock<T::EthSpec>),
/// The blobs corresponding to a `block_root` are now fully available.
BlobsAvailable(Hash256),
/// An unaggregated attestation that references an unknown block.
UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>),
/// An aggregated attestation that references an unknown block.
@@ -87,7 +100,8 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
/// Events sent by the scheduler once they are ready for re-processing.
pub enum ReadyWork<T: BeaconChainTypes> {
Block(QueuedGossipBlock<T>),
GossipBlock(QueuedGossipBlock<T>),
ExecutedBlock(QueuedExecutedBlock<T::EthSpec>),
RpcBlock(QueuedRpcBlock<T::EthSpec>),
Unaggregate(QueuedUnaggregate<T::EthSpec>),
Aggregate(QueuedAggregate<T::EthSpec>),
@@ -131,6 +145,14 @@ pub struct QueuedGossipBlock<T: BeaconChainTypes> {
pub seen_timestamp: Duration,
}
/// A block that has been fully verified and is pending data availability
/// and import into the beacon chain.
pub struct QueuedExecutedBlock<T: EthSpec> {
pub peer_id: PeerId,
pub block: ExecutedBlock<T>,
pub seen_timestamp: Duration,
}
/// A block that arrived for processing when the same block was being imported over gossip.
/// It is queued for later import.
pub struct QueuedRpcBlock<T: EthSpec> {
@@ -147,6 +169,9 @@ pub struct QueuedRpcBlock<T: EthSpec> {
enum InboundEvent<T: BeaconChainTypes> {
/// A gossip block that was queued for later processing and is ready for import.
ReadyGossipBlock(QueuedGossipBlock<T>),
/// An executed block that was queued for blob availability and is now
/// ready for import
ReadyExecutedBlock(QueuedExecutedBlock<T::EthSpec>),
/// A rpc block that was queued because the same gossip block was being imported
/// will now be retried for import.
ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>),
@@ -170,6 +195,8 @@ struct ReprocessQueue<T: BeaconChainTypes> {
/* Queues */
/// Queue to manage scheduled early blocks.
gossip_block_delay_queue: DelayQueue<QueuedGossipBlock<T>>,
/// Queue to manage availability pending blocks.
executed_block_delay_queue: DelayQueue<QueuedExecutedBlock<T::EthSpec>>,
/// Queue to manage scheduled early blocks.
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>,
/// Queue to manage scheduled attestations.
@@ -180,6 +207,8 @@ struct ReprocessQueue<T: BeaconChainTypes> {
/* Queued items */
/// Queued blocks.
queued_gossip_block_roots: HashSet<Hash256>,
/// Queued availability pending blocks.
queued_executed_block_roots: HashMap<Hash256, DelayKey>,
/// Queued aggregated attestations.
queued_aggregates: FnvHashMap<usize, (QueuedAggregate<T::EthSpec>, DelayKey)>,
/// Queued attestations.
@@ -233,13 +262,21 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
// The sequential nature of blockchains means it is generally better to try and import all
// existing blocks before new ones.
match self.gossip_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyGossipBlock(
queued_block.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue")));
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.executed_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyExecutedBlock(
queued_block.into_inner(),
)));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
@@ -247,40 +284,31 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
}
match self.rpc_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.attestations_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(attestation_id))) => {
Poll::Ready(Some(attestation_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyAttestation(
attestation_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.lc_updates_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(lc_id))) => {
Poll::Ready(Some(lc_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
lc_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
@@ -313,10 +341,12 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
work_reprocessing_rx,
ready_work_tx,
gossip_block_delay_queue: DelayQueue::new(),
executed_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_executed_block_roots: HashMap::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
@@ -400,7 +430,7 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
if block_slot <= now
&& self
.ready_work_tx
.try_send(ReadyWork::Block(early_block))
.try_send(ReadyWork::GossipBlock(early_block))
.is_err()
{
error!(
@@ -411,6 +441,59 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
}
}
}
InboundEvent::Msg(ExecutedBlock(executed_block)) => {
if self.executed_block_delay_queue.len() >= MAXIMUM_QUEUED_EXECUTED_BLOCKS {
// TODO(use your own debounce)
if self.rpc_block_debounce.elapsed() {
warn!(
log,
"Executed blocks queue is full";
"queue_size" => MAXIMUM_QUEUED_EXECUTED_BLOCKS,
"msg" => "check system clock"
);
}
// TODO(pawan): block would essentially get dropped here
// can the devs do something?
}
// Queue the block for a slot
let block_root = executed_block.block.block_root;
if !self.queued_executed_block_roots.contains_key(&block_root) {
let key = self
.executed_block_delay_queue
.insert(executed_block, QUEUED_EXECUTED_BLOCK_DELAY);
self.queued_executed_block_roots.insert(block_root, key);
}
}
InboundEvent::Msg(BlobsAvailable(block_root)) => {
match self.queued_executed_block_roots.remove(&block_root) {
None => {
// Log an error to alert that we've made a bad assumption about how this
// program works, but still process the block anyway.
error!(
log,
"Unknown executed block in delay queue";
"block_root" => ?block_root
);
}
Some(key) => {
if let Some(executed_block) =
self.executed_block_delay_queue.try_remove(&key)
{
if self
.ready_work_tx
.try_send(ReadyWork::ExecutedBlock(executed_block.into_inner()))
.is_err()
{
error!(
log,
"Failed to pop queued block";
);
}
}
}
}
}
// A rpc block arrived for processing at the same time when a gossip block
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
// and then send the rpc block back for processing assuming the gossip import
@@ -664,6 +747,24 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
}
}
}
InboundEvent::ReadyExecutedBlock(executed_block) => {
let block_root = executed_block.block.block_root;
if self
.queued_executed_block_roots
.remove(&block_root)
.is_none()
{
// Log an error to alert that we've made a bad assumption about how this
// program works, but still process the block anyway.
error!(
log,
"Unknown block in delay queue";
"block_root" => ?block_root
);
}
// TODO(pawan): just dropping the block, rethink what can be done here
}
// A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyGossipBlock(ready_block) => {
let block_root = ready_block.block.block_root;
@@ -680,7 +781,7 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
if self
.ready_work_tx
.try_send(ReadyWork::Block(ready_block))
.try_send(ReadyWork::GossipBlock(ready_block))
.is_err()
{
error!(
@@ -689,6 +790,7 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
);
}
}
InboundEvent::DelayQueueError(e, queue_name) => {
crit!(
log,

View File

@@ -2,6 +2,7 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, GossipVerifiedBlob};
use beacon_chain::store::Error;
use beacon_chain::ExecutedBlock;
use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation},
light_client_finality_update_verification::Error as LightClientFinalityUpdateError,
@@ -30,8 +31,8 @@ use types::{
use super::{
super::work_reprocessing_queue::{
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
ReprocessQueueMessage,
QueuedAggregate, QueuedExecutedBlock, QueuedGossipBlock, QueuedLightClientUpdate,
QueuedUnaggregate, ReprocessQueueMessage,
},
Worker,
};
@@ -987,6 +988,104 @@ impl<T: BeaconChainTypes> Worker<T> {
}
}
/// Process the beacon block that has already passed gossip verification.
///
/// Raises a log if there are errors.
pub async fn process_execution_verified_block(
self,
peer_id: PeerId,
executed_block: ExecutedBlock<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
// This value is not used presently, but it might come in handy for debugging.
seen_duration: Duration,
) {
let block_root = executed_block.block_root;
let block = executed_block.block.block_cloned();
match self
.chain
.check_availability_and_maybe_import(
|chain| {
chain
.data_availability_checker
.check_block_availability(executed_block)
},
CountUnrealized::True,
)
.await
{
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported {
block_root,
parent_root: block.message().parent_root(),
})
.is_err()
{
error!(
self.log,
"Failed to inform block import";
"source" => "gossip",
"block_root" => ?block_root,
)
};
debug!(
self.log,
"Gossipsub block processed";
"block" => ?block_root,
"peer_id" => %peer_id
);
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::PendingBlobs(_))
| Ok(AvailabilityProcessingStatus::PendingBlock(_))
| Err(BlockError::AvailabilityCheck(_)) => {
// TODO(need to do something different if it's unavailble again)
unimplemented!()
}
Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
error!(
self.log,
"Block with unknown parent attempted to be processed";
"peer_id" => %peer_id
);
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root));
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(
self.log,
"Failed to verify execution payload";
"error" => %e
);
}
other => {
debug!(
self.log,
"Invalid gossip beacon block";
"outcome" => ?other,
"block root" => ?block_root,
"block slot" => block.slot()
);
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"bad_gossip_block_ssz",
);
trace!(
self.log,
"Invalid gossip beacon block ssz";
"ssz" => format_args!("0x{}", hex::encode(block.as_ssz_bytes())),
);
}
};
}
/// Process the beacon block that has already passed gossip verification.
///
/// Raises a log if there are errors.
@@ -996,7 +1095,7 @@ impl<T: BeaconChainTypes> Worker<T> {
verified_block: GossipVerifiedBlock<T>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
seen_duration: Duration,
) {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;
@@ -1044,6 +1143,32 @@ impl<T: BeaconChainTypes> Worker<T> {
}
Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) => {
// make rpc request for blob
// let block_slot = block.block.slot();
// // Make rpc request for blobs
// self.send_sync_message(SyncMessage::UnknownBlobHash {
// peer_id,
// block_root: block.block_root,
// });
// // Send block to reprocessing queue to await blobs
// if reprocess_tx
// .try_send(ReprocessQueueMessage::ExecutedBlock(QueuedExecutedBlock {
// peer_id,
// block,
// seen_timestamp: seen_duration,
// }))
// .is_err()
// {
// error!(
// self.log,
// "Failed to send partially verified block to reprocessing queue";
// "block_slot" => %block_slot,
// "block_root" => ?block_root,
// "location" => "block gossip"
// )
// }
}
Err(BlockError::AvailabilityCheck(_)) => {
todo!()
}
Err(BlockError::ParentUnknown(block)) => {

View File

@@ -119,6 +119,14 @@ pub enum SyncMessage<T: EthSpec> {
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHash(PeerId, Hash256),
/// A peer has sent us a block that we haven't received all the blobs for. This triggers
/// the manager to attempt to find a blobs for the given block root.
/// TODO: add required blob indices as well.
UnknownBlobHash {
peer_id: PeerId,
block_root: Hash256,
},
/// A peer has disconnected.
Disconnect(PeerId),
@@ -598,6 +606,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.search_block(block_hash, peer_id, &mut self.network);
}
}
SyncMessage::UnknownBlobHash { .. } => {
unimplemented!()
}
SyncMessage::Disconnect(peer_id) => {
self.peer_disconnect(&peer_id);
}