Skip to content
Snippets Groups Projects
Commit 4e3d3d2b authored by Jeffrey Wigger's avatar Jeffrey Wigger
Browse files

this finishes most of the time

parent b2fe436a
No related branches found
No related tags found
No related merge requests found
......@@ -223,11 +223,11 @@ class TCPRandomWalk(TCPRandomWalkBase):
self.send_queue = mp.Queue(1000)
self.recv_queue = mp.Queue(1000)
# Since we are only adding these do not need to share the same lock, intermediary results may be inconsistent
self.lock = Lock()
self.total_data = Value(c_long, 0, lock = self.lock)
self.total_meta = Value(c_long, 0, lock = self.lock)
self.total_bytes = Value(c_long, 0, lock = self.lock)
self.flag_running = Value(c_int, 0, lock=False)
self.lock = mp.Lock()
self.total_data = mp.Value(c_long, 0, lock = self.lock)
self.total_meta = mp.Value(c_long, 0, lock = self.lock)
self.total_bytes = mp.Value(c_long, 0, lock = self.lock)
self.flag_running = mp.Value(c_int, 0, lock=False)
def connect_neighbors(self, neighbors):
"""
......@@ -305,14 +305,14 @@ class TCPRandomWalk(TCPRandomWalkBase):
"""
print("disconnect_neighbors")
self.flag_running.value = 0
del self.lock
self.send_queue.close() # this crashes
self.send_queue.close() # this crashes
self.recv_queue.close()
self.flag_running.value = 0
#del self.lock
self.ctx.join()
print("disconnect_neighbors: joined")
# self.send_queue.close() # this crashes
# self.recv_queue.close()
self.send_queue.join_thread()
self.recv_queue.join_thread()
print(f"disconnect_neighbors: joined {self.uid}")
class TCPRandomWalkInternal(TCPRandomWalkBase):
......@@ -404,9 +404,18 @@ class TCPRandomWalkInternal(TCPRandomWalkBase):
print(error_message)
logging.debug("GOT EXCEPTION")
logging.debug(error_message)
while not self.recv_queue.empty():
print(f"{self.uid}: clear rcv")
_ = self.recv_queue.get_nowait()
self.recv_queue.close()
self.recv_queue.join_thread()
print(f"{self.uid}: joined recv")
while not self.send_queue.empty():
print(f"{self.uid}: clear snd")
_ = self.send_queue.get_nowait()
print(f"{self.uid}: joined send")
self.send_queue.close()
self.send_queue.join_thread()
print("end")
def __del__(self):
......@@ -639,6 +648,9 @@ class TCPRandomWalkInternal(TCPRandomWalkBase):
# raise RuntimeError(
# "Received a message when expecting BYE from {}".format(sender)
# )
for sock in self.peer_sockets.values():
sock.close()
self.router.close()
def rw_sampler_equi(self, message):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment