diff --git a/src/decentralizepy/communication/TCPRandomWalk.py b/src/decentralizepy/communication/TCPRandomWalk.py index 9943134ea1ec7249dfe2fba3330f441663dc8729..b1055afd61d7b3bb83394d0ca31329386718563a 100644 --- a/src/decentralizepy/communication/TCPRandomWalk.py +++ b/src/decentralizepy/communication/TCPRandomWalk.py @@ -223,11 +223,11 @@ class TCPRandomWalk(TCPRandomWalkBase): self.send_queue = mp.Queue(1000) self.recv_queue = mp.Queue(1000) # Since we are only adding these do not need to share the same lock, intermediary results may be inconsistent - self.lock = Lock() - self.total_data = Value(c_long, 0, lock = self.lock) - self.total_meta = Value(c_long, 0, lock = self.lock) - self.total_bytes = Value(c_long, 0, lock = self.lock) - self.flag_running = Value(c_int, 0, lock=False) + self.lock = mp.Lock() + self.total_data = mp.Value(c_long, 0, lock = self.lock) + self.total_meta = mp.Value(c_long, 0, lock = self.lock) + self.total_bytes = mp.Value(c_long, 0, lock = self.lock) + self.flag_running = mp.Value(c_int, 0, lock=False) def connect_neighbors(self, neighbors): """ @@ -305,14 +305,14 @@ class TCPRandomWalk(TCPRandomWalkBase): """ print("disconnect_neighbors") - self.flag_running.value = 0 - del self.lock - self.send_queue.close() # this crashes + self.send_queue.close() # this crashes self.recv_queue.close() + self.flag_running.value = 0 + #del self.lock self.ctx.join() - print("disconnect_neighbors: joined") - # self.send_queue.close() # this crashes - # self.recv_queue.close() + self.send_queue.join_thread() + self.recv_queue.join_thread() + print(f"disconnect_neighbors: joined {self.uid}") class TCPRandomWalkInternal(TCPRandomWalkBase): @@ -404,9 +404,18 @@ class TCPRandomWalkInternal(TCPRandomWalkBase): print(error_message) logging.debug("GOT EXCEPTION") logging.debug(error_message) - + while not self.recv_queue.empty(): + print(f"{self.uid}: clear rcv") + _ = self.recv_queue.get_nowait() self.recv_queue.close() + self.recv_queue.join_thread() + print(f"{self.uid}: joined recv") + while not self.send_queue.empty(): + print(f"{self.uid}: clear snd") + _ = self.send_queue.get_nowait() + print(f"{self.uid}: joined send") self.send_queue.close() + self.send_queue.join_thread() print("end") def __del__(self): @@ -639,6 +648,9 @@ class TCPRandomWalkInternal(TCPRandomWalkBase): # raise RuntimeError( # "Received a message when expecting BYE from {}".format(sender) # ) + for sock in self.peer_sockets.values(): + sock.close() + self.router.close() def rw_sampler_equi(self, message):