From 072689b1ef726ce468c2590e3c691b962c2bc180 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Thu, 5 May 2022 14:19:06 +0200 Subject: [PATCH 01/17] merging compression for PartialModel --- src/decentralizepy/communication/TCP.py | 32 ++++++++++++++++++---- src/decentralizepy/node/Node.py | 8 +++--- src/decentralizepy/sharing/PartialModel.py | 15 +++++----- src/decentralizepy/sharing/Sharing.py | 3 +- 4 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index fed9301..495b82d 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -5,7 +5,11 @@ import pickle from collections import deque import zmq +import lz4.frame +from pathlib import Path +import os +import decentralizepy.communication.elias as elias from decentralizepy.communication.Communication import Communication HELLO = b"HELLO" @@ -85,6 +89,10 @@ class TCP(Communication): self.sent_disconnections = False self.compress = compress + self.total_data = 0 + self.total_meta = 0 + + self.peer_deque = deque() self.peer_sockets = dict() self.barrier = set() @@ -112,11 +120,18 @@ class TCP(Communication): """ if self.compress: - compressor = lzma.LZMACompressor() - output = compressor.compress(pickle.dumps(data)) + compressor.flush() + if "indices" in data: + data["indices"] = lz4.frame.compress(data["indices"]) + output = pickle.dumps(data) + # the compressed meta data gets only a few bytes smaller after pickling + self.total_meta += len(data["indices"]) + self.total_data += (len(output) - len(data["indices"])) else: output = pickle.dumps(data) - + if type(data) == dict and "indices" in data: # otherwise it is a centralized testing message + meta_len = len(pickle.dumps(data["indices"])) + self.total_meta += meta_len + self.total_data += (len(output) - meta_len) return output def decrypt(self, sender, data): @@ -138,7 +153,9 @@ class TCP(Communication): """ sender = int(sender.decode()) if self.compress: - data = pickle.loads(lzma.decompress(data)) + data = pickle.loads(data) + if "indices" in data: + data["indices"] = lz4.frame.decompress(data["indices"]) else: data = pickle.loads(data) return sender, data @@ -226,7 +243,7 @@ class TCP(Communication): logging.debug("Received message from {}".format(sender)) return self.decrypt(sender, recv) - def send(self, uid, data): + def send(self, uid, data, encrypt = True): """ Send a message to a process. @@ -239,7 +256,10 @@ class TCP(Communication): """ assert self.initialized == True - to_send = self.encrypt(data) + if encrypt: + to_send = self.encrypt(data) + else: + to_send = data data_size = len(to_send) self.total_bytes += data_size id = str(uid).encode() diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py index e387c58..4c93a00 100644 --- a/src/decentralizepy/node/Node.py +++ b/src/decentralizepy/node/Node.py @@ -433,12 +433,12 @@ class Node: results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes - if hasattr(self.sharing, "total_meta"): - results_dict["total_meta"][iteration + 1] = self.sharing.total_meta - if hasattr(self.sharing, "total_data"): + if hasattr(self.communication, "total_meta"): + results_dict["total_meta"][iteration + 1] = self.communication.total_meta + if hasattr(self.communication, "total_data"): results_dict["total_data_per_n"][ iteration + 1 - ] = self.sharing.total_data + ] = self.communication.total_data if hasattr(self.sharing, "mean"): results_dict["grad_mean"][iteration + 1] = self.sharing.mean if hasattr(self.sharing, "std"): diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index 018f895..f27ec9d 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -202,23 +202,22 @@ class PartialModel(Sharing): if not self.dict_ordered: raise NotImplementedError - - m["alpha"] = self.alpha - - m["indices"] = G_topk.numpy().astype(np.int32) + sorted32 = G_topk.numpy() + sorted32.sort() + print(sorted32.shape) + diff = np.diff(sorted32, prepend = 0).astype(np.int32) + print(diff.dtype) + m["indices"] = diff.tobytes("C") m["params"] = T_topk.numpy() m["send_partial"] = True - assert len(m["indices"]) == len(m["params"]) logging.info("Elements sending: {}".format(len(m["indices"]))) logging.info("Generated dictionary to send") logging.info("Converted dictionary to pickle") - self.total_data += len(self.communication.encrypt(m["params"])) - self.total_meta += len(self.communication.encrypt(m["indices"])) return m @@ -256,7 +255,7 @@ class PartialModel(Sharing): tensors_to_cat.append(t) T = torch.cat(tensors_to_cat, dim=0) - index_tensor = torch.tensor(m["indices"], dtype=torch.long) + index_tensor = torch.tensor(np.cumsum(np.frombuffer(m["indices"], dtype=np.int32)), dtype=torch.long) logging.debug("Original tensor: {}".format(T[index_tensor])) T[index_tensor] = torch.tensor(m["params"]) logging.debug("Final tensor: {}".format(T[index_tensor])) diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py index 9f7645b..0e77008 100644 --- a/src/decentralizepy/sharing/Sharing.py +++ b/src/decentralizepy/sharing/Sharing.py @@ -177,8 +177,9 @@ class Sharing: iter_neighbors = self.get_neighbors(all_neighbors) data["degree"] = len(all_neighbors) data["iteration"] = self.communication_round + encrypted = self.communication.encrypt(data) for neighbor in iter_neighbors: - self.communication.send(neighbor, data) + self.communication.send(neighbor, encrypted, encrypt = False) logging.info("Waiting for messages from neighbors") while not self.received_from_all(): -- GitLab From 7b122a23263661752e5eec97cc0fcd65578da25e Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Thu, 5 May 2022 14:46:47 +0200 Subject: [PATCH 02/17] compression on wavelet --- src/decentralizepy/sharing/Wavelet.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/decentralizepy/sharing/Wavelet.py b/src/decentralizepy/sharing/Wavelet.py index 407714d..cdf3fd0 100644 --- a/src/decentralizepy/sharing/Wavelet.py +++ b/src/decentralizepy/sharing/Wavelet.py @@ -215,20 +215,18 @@ class Wavelet(PartialModel): if not self.dict_ordered: raise NotImplementedError + sorted32 = indices.numpy() + sorted32.sort() + diff = np.diff(sorted32, prepend = 0).astype(np.int32) + m["indices"] = diff.tobytes("C") m["alpha"] = self.alpha m["params"] = topk.numpy() - m["indices"] = indices.numpy().astype(np.int32) m["send_partial"] = True - self.total_data += len(self.communication.encrypt(m["params"])) - self.total_meta += len(self.communication.encrypt(m["indices"])) + len( - self.communication.encrypt(m["alpha"]) - ) - return m def deserialized_model(self, m): @@ -256,13 +254,11 @@ class Wavelet(PartialModel): with torch.no_grad(): if not self.dict_ordered: raise NotImplementedError - - indices = m["indices"] alpha = m["alpha"] params = m["params"] params_tensor = torch.tensor(params) - indices_tensor = torch.tensor(indices, dtype=torch.long) + indices_tensor = torch.tensor(np.cumsum(np.frombuffer(m["indices"], dtype=np.int32)), dtype=torch.long) ret = dict() ret["indices"] = indices_tensor ret["params"] = params_tensor -- GitLab From ee72c43d0e12ee6ef85f3bbf330c393e54ea29f0 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Thu, 5 May 2022 15:35:58 +0200 Subject: [PATCH 03/17] minor refactor --- src/decentralizepy/communication/TCP.py | 7 +-- src/decentralizepy/compression/Compression.py | 45 ++++++++++++++++ src/decentralizepy/compression/Lz4Wrapper.py | 52 +++++++++++++++++++ src/decentralizepy/sharing/PartialModel.py | 10 ++-- src/decentralizepy/sharing/Wavelet.py | 10 ++-- 5 files changed, 107 insertions(+), 17 deletions(-) create mode 100644 src/decentralizepy/compression/Compression.py create mode 100644 src/decentralizepy/compression/Lz4Wrapper.py diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 495b82d..c364f8c 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -9,8 +9,8 @@ import lz4.frame from pathlib import Path import os -import decentralizepy.communication.elias as elias from decentralizepy.communication.Communication import Communication +from decentralizepy.compression.Lz4Wrapper import Lz4Wrapper HELLO = b"HELLO" BYE = b"BYE" @@ -88,6 +88,7 @@ class TCP(Communication): self.router.bind(self.addr(rank, machine_id)) self.sent_disconnections = False self.compress = compress + self.compressor = Lz4Wrapper() self.total_data = 0 self.total_meta = 0 @@ -121,7 +122,7 @@ class TCP(Communication): """ if self.compress: if "indices" in data: - data["indices"] = lz4.frame.compress(data["indices"]) + data["indices"] = self.compressor.compress(data["indices"]) output = pickle.dumps(data) # the compressed meta data gets only a few bytes smaller after pickling self.total_meta += len(data["indices"]) @@ -155,7 +156,7 @@ class TCP(Communication): if self.compress: data = pickle.loads(data) if "indices" in data: - data["indices"] = lz4.frame.decompress(data["indices"]) + data["indices"] = self.compressor.decompress(data["indices"]) else: data = pickle.loads(data) return sender, data diff --git a/src/decentralizepy/compression/Compression.py b/src/decentralizepy/compression/Compression.py new file mode 100644 index 0000000..e1a575a --- /dev/null +++ b/src/decentralizepy/compression/Compression.py @@ -0,0 +1,45 @@ +import numpy as np +class Communication: + """ + Compression API + + """ + + def __init__(self): + """ + Constructor + """ + + def compress(self, arr): + """ + compression function + + Parameters + ---------- + arr : np.ndarray + Data to compress + + Returns + ------- + bytearray + encoded data as bytes + + """ + raise NotImplementedError + + def decompress(self, bytes): + """ + decompression function + + Parameters + ---------- + bytes :bytearray + compressed data + + Returns + ------- + arr : np.ndarray + decompressed data as array + + """ + raise NotImplementedError \ No newline at end of file diff --git a/src/decentralizepy/compression/Lz4Wrapper.py b/src/decentralizepy/compression/Lz4Wrapper.py new file mode 100644 index 0000000..e204ea3 --- /dev/null +++ b/src/decentralizepy/compression/Lz4Wrapper.py @@ -0,0 +1,52 @@ +import numpy as np +import lz4.frame +class Lz4Wrapper: + """ + Compression API + + """ + + def __init__(self): + """ + Constructor + """ + + def compress(self, arr): + """ + compression function + + Parameters + ---------- + arr : np.ndarray + Data to compress + + Returns + ------- + bytearray + encoded data as bytes + + """ + arr.sort() + print(arr.shape) + diff = np.diff(arr, prepend=0).astype(np.int32) + print(diff.dtype) + to_compress = diff.tobytes("C") + return lz4.frame.compress(to_compress) + + def decompress(self, bytes): + """ + decompression function + + Parameters + ---------- + bytes :bytearray + compressed data + + Returns + ------- + arr : np.ndarray + decompressed data as array + + """ + decomp = lz4.frame.decompress(bytes) + return np.cumsum(np.frombuffer(decomp, dtype=np.int32)) \ No newline at end of file diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index f27ec9d..19c8654 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -202,12 +202,8 @@ class PartialModel(Sharing): if not self.dict_ordered: raise NotImplementedError - sorted32 = G_topk.numpy() - sorted32.sort() - print(sorted32.shape) - diff = np.diff(sorted32, prepend = 0).astype(np.int32) - print(diff.dtype) - m["indices"] = diff.tobytes("C") + + m["indices"] = G_topk.numpy() m["params"] = T_topk.numpy() @@ -255,7 +251,7 @@ class PartialModel(Sharing): tensors_to_cat.append(t) T = torch.cat(tensors_to_cat, dim=0) - index_tensor = torch.tensor(np.cumsum(np.frombuffer(m["indices"], dtype=np.int32)), dtype=torch.long) + index_tensor = torch.tensor(m["indices"], dtype=torch.long) logging.debug("Original tensor: {}".format(T[index_tensor])) T[index_tensor] = torch.tensor(m["params"]) logging.debug("Final tensor: {}".format(T[index_tensor])) diff --git a/src/decentralizepy/sharing/Wavelet.py b/src/decentralizepy/sharing/Wavelet.py index cdf3fd0..ba8763d 100644 --- a/src/decentralizepy/sharing/Wavelet.py +++ b/src/decentralizepy/sharing/Wavelet.py @@ -215,10 +215,7 @@ class Wavelet(PartialModel): if not self.dict_ordered: raise NotImplementedError - sorted32 = indices.numpy() - sorted32.sort() - diff = np.diff(sorted32, prepend = 0).astype(np.int32) - m["indices"] = diff.tobytes("C") + m["indices"] = indices.numpy() m["alpha"] = self.alpha @@ -255,10 +252,9 @@ class Wavelet(PartialModel): if not self.dict_ordered: raise NotImplementedError alpha = m["alpha"] - params = m["params"] - params_tensor = torch.tensor(params) - indices_tensor = torch.tensor(np.cumsum(np.frombuffer(m["indices"], dtype=np.int32)), dtype=torch.long) + params_tensor = torch.tensor(m["params"]) + indices_tensor = torch.tensor(m["indices"], dtype=torch.long) ret = dict() ret["indices"] = indices_tensor ret["params"] = params_tensor -- GitLab From 9652e74d71b2e0d78240cf1f1b257d02031941e6 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Thu, 5 May 2022 15:52:47 +0200 Subject: [PATCH 04/17] reorders and minor fixes --- src/decentralizepy/compression/Lz4Wrapper.py | 2 -- src/decentralizepy/sharing/PartialModel.py | 3 +++ src/decentralizepy/sharing/Wavelet.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/decentralizepy/compression/Lz4Wrapper.py b/src/decentralizepy/compression/Lz4Wrapper.py index e204ea3..b10155d 100644 --- a/src/decentralizepy/compression/Lz4Wrapper.py +++ b/src/decentralizepy/compression/Lz4Wrapper.py @@ -27,9 +27,7 @@ class Lz4Wrapper: """ arr.sort() - print(arr.shape) diff = np.diff(arr, prepend=0).astype(np.int32) - print(diff.dtype) to_compress = diff.tobytes("C") return lz4.frame.compress(to_compress) diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index 19c8654..3bc19cc 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -203,12 +203,15 @@ class PartialModel(Sharing): if not self.dict_ordered: raise NotImplementedError + m["alpha"] = self.alpha + m["indices"] = G_topk.numpy() m["params"] = T_topk.numpy() m["send_partial"] = True + assert len(m["indices"]) == len(m["params"]) logging.info("Elements sending: {}".format(len(m["indices"]))) logging.info("Generated dictionary to send") diff --git a/src/decentralizepy/sharing/Wavelet.py b/src/decentralizepy/sharing/Wavelet.py index ba8763d..787d922 100644 --- a/src/decentralizepy/sharing/Wavelet.py +++ b/src/decentralizepy/sharing/Wavelet.py @@ -215,12 +215,12 @@ class Wavelet(PartialModel): if not self.dict_ordered: raise NotImplementedError - m["indices"] = indices.numpy() m["alpha"] = self.alpha m["params"] = topk.numpy() + m["indices"] = indices.numpy() m["send_partial"] = True -- GitLab From 85b65c585ade4143a5ebc479572860f7dba592c3 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Thu, 5 May 2022 16:06:57 +0200 Subject: [PATCH 05/17] readded conversion to int32 if not compressing --- src/decentralizepy/sharing/PartialModel.py | 2 +- src/decentralizepy/sharing/Wavelet.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index 3bc19cc..daf9b8d 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -205,7 +205,7 @@ class PartialModel(Sharing): m["alpha"] = self.alpha - m["indices"] = G_topk.numpy() + m["indices"] = G_topk.numpy().astype(np.int32) m["params"] = T_topk.numpy() diff --git a/src/decentralizepy/sharing/Wavelet.py b/src/decentralizepy/sharing/Wavelet.py index 787d922..516fa05 100644 --- a/src/decentralizepy/sharing/Wavelet.py +++ b/src/decentralizepy/sharing/Wavelet.py @@ -220,7 +220,7 @@ class Wavelet(PartialModel): m["params"] = topk.numpy() - m["indices"] = indices.numpy() + m["indices"] = indices.numpy().astype(np.int32) m["send_partial"] = True -- GitLab From 29c06c87256f62d6b48fa3c61d988e6e7b35be37 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Thu, 5 May 2022 16:11:00 +0200 Subject: [PATCH 06/17] compressing both data and indices --- src/decentralizepy/communication/TCP.py | 4 ++ src/decentralizepy/compression/Compression.py | 35 +++++++++++++++++ src/decentralizepy/compression/Lz4Wrapper.py | 38 ++++++++++++++++++- 3 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index c364f8c..070d057 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -123,6 +123,8 @@ class TCP(Communication): if self.compress: if "indices" in data: data["indices"] = self.compressor.compress(data["indices"]) + if "params" in data: + data["params"] = self.compressor.compress_float(data["params"]) output = pickle.dumps(data) # the compressed meta data gets only a few bytes smaller after pickling self.total_meta += len(data["indices"]) @@ -157,6 +159,8 @@ class TCP(Communication): data = pickle.loads(data) if "indices" in data: data["indices"] = self.compressor.decompress(data["indices"]) + if "params" in data: + data["params"] = self.compressor.decompress_float(data["params"]) else: data = pickle.loads(data) return sender, data diff --git a/src/decentralizepy/compression/Compression.py b/src/decentralizepy/compression/Compression.py index e1a575a..253d808 100644 --- a/src/decentralizepy/compression/Compression.py +++ b/src/decentralizepy/compression/Compression.py @@ -31,6 +31,41 @@ class Communication: """ decompression function + Parameters + ---------- + bytes :bytearray + compressed data + + Returns + ------- + arr : np.ndarray + decompressed data as array + + """ + raise NotImplementedError + + + def compress_float(self, arr): + """ + compression function for float arrays + + Parameters + ---------- + arr : np.ndarray + Data to compress + + Returns + ------- + bytearray + encoded data as bytes + + """ + raise NotImplementedError + + def decompress_float(self, bytes): + """ + decompression function for compressed float arrays + Parameters ---------- bytes :bytearray diff --git a/src/decentralizepy/compression/Lz4Wrapper.py b/src/decentralizepy/compression/Lz4Wrapper.py index b10155d..0dfd8a7 100644 --- a/src/decentralizepy/compression/Lz4Wrapper.py +++ b/src/decentralizepy/compression/Lz4Wrapper.py @@ -47,4 +47,40 @@ class Lz4Wrapper: """ decomp = lz4.frame.decompress(bytes) - return np.cumsum(np.frombuffer(decomp, dtype=np.int32)) \ No newline at end of file + return np.cumsum(np.frombuffer(decomp, dtype=np.int32)) + + def compress_float(self, arr): + """ + compression function for float arrays + + Parameters + ---------- + arr : np.ndarray + Data to compress + + Returns + ------- + bytearray + encoded data as bytes + + """ + to_compress = arr.tobytes("C") + return lz4.frame.compress(to_compress) + + def decompress_float(self, bytes): + """ + decompression function for compressed float arrays + + Parameters + ---------- + bytes :bytearray + compressed data + + Returns + ------- + arr : np.ndarray + decompressed data as array + + """ + decomp = lz4.frame.decompress(bytes) + return np.frombuffer(decomp, dtype=np.float32) \ No newline at end of file -- GitLab From 6e1dbe0251f5211689c1c936f454a41b875fd1c2 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Thu, 5 May 2022 18:38:00 +0200 Subject: [PATCH 07/17] adding options to the Lz4Wrapper to compress only the metadata --- src/decentralizepy/communication/TCP.py | 2 +- src/decentralizepy/compression/Lz4Wrapper.py | 32 +++++++++++++------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 070d057..6d6eb3c 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -131,7 +131,7 @@ class TCP(Communication): self.total_data += (len(output) - len(data["indices"])) else: output = pickle.dumps(data) - if type(data) == dict and "indices" in data: # otherwise it is a centralized testing message + if type(data) == dict and "indices" in data: # otherwise it is a centralized testing message meta_len = len(pickle.dumps(data["indices"])) self.total_meta += meta_len self.total_data += (len(output) - meta_len) diff --git a/src/decentralizepy/compression/Lz4Wrapper.py b/src/decentralizepy/compression/Lz4Wrapper.py index 0dfd8a7..136fea0 100644 --- a/src/decentralizepy/compression/Lz4Wrapper.py +++ b/src/decentralizepy/compression/Lz4Wrapper.py @@ -6,10 +6,12 @@ class Lz4Wrapper: """ - def __init__(self): + def __init__(self, compress_metadata = True, compress_data = False): """ Constructor """ + self.compress_metadata = compress_metadata + self.compress_data = compress_data def compress(self, arr): """ @@ -26,10 +28,12 @@ class Lz4Wrapper: encoded data as bytes """ - arr.sort() - diff = np.diff(arr, prepend=0).astype(np.int32) - to_compress = diff.tobytes("C") - return lz4.frame.compress(to_compress) + if self.compress_metadata: + arr.sort() + diff = np.diff(arr, prepend=0).astype(np.int32) + to_compress = diff.tobytes("C") + return lz4.frame.compress(to_compress) + return arr def decompress(self, bytes): """ @@ -46,8 +50,10 @@ class Lz4Wrapper: decompressed data as array """ - decomp = lz4.frame.decompress(bytes) - return np.cumsum(np.frombuffer(decomp, dtype=np.int32)) + if self.compress_metadata: + decomp = lz4.frame.decompress(bytes) + return np.cumsum(np.frombuffer(decomp, dtype=np.int32)) + return bytes def compress_float(self, arr): """ @@ -64,8 +70,10 @@ class Lz4Wrapper: encoded data as bytes """ - to_compress = arr.tobytes("C") - return lz4.frame.compress(to_compress) + if self.compress_data: + to_compress = arr.tobytes("C") + return lz4.frame.compress(to_compress) + return arr def decompress_float(self, bytes): """ @@ -82,5 +90,7 @@ class Lz4Wrapper: decompressed data as array """ - decomp = lz4.frame.decompress(bytes) - return np.frombuffer(decomp, dtype=np.float32) \ No newline at end of file + if self.compress_data: + decomp = lz4.frame.decompress(bytes) + return np.frombuffer(decomp, dtype=np.float32) + return bytes \ No newline at end of file -- GitLab From e6e580e5159e9bbaef806810517ccca3185540be Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Thu, 5 May 2022 21:17:00 +0200 Subject: [PATCH 08/17] added Elias --- src/decentralizepy/communication/TCP.py | 4 +- src/decentralizepy/compression/Elias.py | 119 ++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 src/decentralizepy/compression/Elias.py diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 6d6eb3c..3826fad 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -11,7 +11,7 @@ import os from decentralizepy.communication.Communication import Communication from decentralizepy.compression.Lz4Wrapper import Lz4Wrapper - +from decentralizepy.compression.Elias import Elias HELLO = b"HELLO" BYE = b"BYE" @@ -88,7 +88,7 @@ class TCP(Communication): self.router.bind(self.addr(rank, machine_id)) self.sent_disconnections = False self.compress = compress - self.compressor = Lz4Wrapper() + self.compressor = Elias() self.total_data = 0 self.total_meta = 0 diff --git a/src/decentralizepy/compression/Elias.py b/src/decentralizepy/compression/Elias.py new file mode 100644 index 0000000..744fda9 --- /dev/null +++ b/src/decentralizepy/compression/Elias.py @@ -0,0 +1,119 @@ +# elias implementation: taken from this stack overflow post: +# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma +import numpy as np + +class Elias: + """ + Compression API + + """ + + def __init__(self): + """ + Constructor + """ + + def compress(self, arr): + """ + compression function + + Parameters + ---------- + arr : np.ndarray + Data to compress + + Returns + ------- + bytearray + encoded data as bytes + + """ + print("begin") + arr = arr.view(f'u{arr.itemsize}') + l = np.log2(arr).astype('u1') + L = ((l << 1) + 1).cumsum() + out = np.zeros(int(L[-1] + 64), 'u1') + for i in range(l.max() + 1): + out[L - i - 1] += (arr >> i) & 1 + + s = np.array([out.size], dtype=np.int64) + size = np.ndarray(8, dtype='u1', buffer=s.data) + packed = np.packbits(out) + packed[-8:] = size + print("end") + return packed + + def decompress(self, bytes): + """ + decompression function + + Parameters + ---------- + bytes :bytearray + compressed data + + Returns + ------- + arr : np.ndarray + decompressed data as array + + """ + n_arr = bytes[-8:] + n = np.ndarray(1, dtype=np.int64, buffer=n_arr.data)[0] + b = bytes[:-8] + b = np.unpackbits(b, count=n).view(bool) + s = b.nonzero()[0] + s = (s << 1).repeat(np.diff(s, prepend=-1)) + s -= np.arange(-1, len(s) - 1) + s = s.tolist() # list has faster __getitem__ + ns = len(s) + + def gen(): + idx = 0 + yield idx + while idx < ns: + idx = s[idx] + yield idx + + offs = np.fromiter(gen(), int) + sz = np.diff(offs) >> 1 + mx = sz.max() + 1 + out = np.zeros(offs.size - 1, int) + for i in range(mx): + out[b[offs[1:] - i - 1] & (sz >= i)] += 1 << i + return out + + + def compress_float(self, arr): + """ + compression function for float arrays + + Parameters + ---------- + arr : np.ndarray + Data to compress + + Returns + ------- + bytearray + encoded data as bytes + + """ + return arr + + def decompress_float(self, bytes): + """ + decompression function for compressed float arrays + + Parameters + ---------- + bytes :bytearray + compressed data + + Returns + ------- + arr : np.ndarray + decompressed data as array + + """ + return bytes \ No newline at end of file -- GitLab From ec79ad7c5bf2013c173c3afddf5bd4b7e3d6b184 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Fri, 6 May 2022 10:19:03 +0200 Subject: [PATCH 09/17] Elias fix --- src/decentralizepy/compression/Elias.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/decentralizepy/compression/Elias.py b/src/decentralizepy/compression/Elias.py index 744fda9..bf0b30d 100644 --- a/src/decentralizepy/compression/Elias.py +++ b/src/decentralizepy/compression/Elias.py @@ -28,7 +28,8 @@ class Elias: encoded data as bytes """ - print("begin") + arr.sort() + arr = np.diff(arr, prepend=0).astype(np.int32) arr = arr.view(f'u{arr.itemsize}') l = np.log2(arr).astype('u1') L = ((l << 1) + 1).cumsum() @@ -40,7 +41,6 @@ class Elias: size = np.ndarray(8, dtype='u1', buffer=s.data) packed = np.packbits(out) packed[-8:] = size - print("end") return packed def decompress(self, bytes): @@ -81,6 +81,7 @@ class Elias: out = np.zeros(offs.size - 1, int) for i in range(mx): out[b[offs[1:] - i - 1] & (sz >= i)] += 1 << i + out = np.cumsum(out) return out -- GitLab From 0a1eea44740a6359786c38724e795060e898c79c Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Fri, 6 May 2022 13:33:26 +0200 Subject: [PATCH 10/17] Elias fix 2: the first index can be a zero which crashes elias, now the first index gets sent uncompressed --- setup.cfg | 1 + src/decentralizepy/compression/Elias.py | 20 +++++--- src/decentralizepy/compression/EliasFpzip.py | 51 ++++++++++++++++++++ 3 files changed, 66 insertions(+), 6 deletions(-) create mode 100644 src/decentralizepy/compression/EliasFpzip.py diff --git a/setup.cfg b/setup.cfg index d7f7a7f..08f34d4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -46,6 +46,7 @@ install_requires = pandas crudini sklearn + lz4 include_package_data = True python_requires = >=3.6 [options.packages.find] diff --git a/src/decentralizepy/compression/Elias.py b/src/decentralizepy/compression/Elias.py index bf0b30d..dfdeee5 100644 --- a/src/decentralizepy/compression/Elias.py +++ b/src/decentralizepy/compression/Elias.py @@ -1,7 +1,7 @@ # elias implementation: taken from this stack overflow post: # https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma import numpy as np - +import fpzip class Elias: """ Compression API @@ -29,11 +29,12 @@ class Elias: """ arr.sort() - arr = np.diff(arr, prepend=0).astype(np.int32) + first = arr[0] + arr = np.diff(arr).astype(np.int32) arr = arr.view(f'u{arr.itemsize}') l = np.log2(arr).astype('u1') L = ((l << 1) + 1).cumsum() - out = np.zeros(int(L[-1] + 64), 'u1') + out = np.zeros(int(L[-1] + 128), 'u1') for i in range(l.max() + 1): out[L - i - 1] += (arr >> i) & 1 @@ -41,6 +42,9 @@ class Elias: size = np.ndarray(8, dtype='u1', buffer=s.data) packed = np.packbits(out) packed[-8:] = size + s = np.array([first], dtype=np.int64) + size = np.ndarray(8, dtype='u1', buffer=s.data) + packed[-16:-8] = size return packed def decompress(self, bytes): @@ -60,7 +64,9 @@ class Elias: """ n_arr = bytes[-8:] n = np.ndarray(1, dtype=np.int64, buffer=n_arr.data)[0] - b = bytes[:-8] + first = bytes[-16: -8] + first = np.ndarray(1, dtype=np.int64, buffer=first.data)[0] + b = bytes[:-16] b = np.unpackbits(b, count=n).view(bool) s = b.nonzero()[0] s = (s << 1).repeat(np.diff(s, prepend=-1)) @@ -78,10 +84,12 @@ class Elias: offs = np.fromiter(gen(), int) sz = np.diff(offs) >> 1 mx = sz.max() + 1 - out = np.zeros(offs.size - 1, int) + out_fin = np.zeros(offs.size, int) + out_fin[0] = first + out = out_fin[1:] for i in range(mx): out[b[offs[1:] - i - 1] & (sz >= i)] += 1 << i - out = np.cumsum(out) + out = np.cumsum(out_fin) return out diff --git a/src/decentralizepy/compression/EliasFpzip.py b/src/decentralizepy/compression/EliasFpzip.py new file mode 100644 index 0000000..42eb352 --- /dev/null +++ b/src/decentralizepy/compression/EliasFpzip.py @@ -0,0 +1,51 @@ +# elias implementation: taken from this stack overflow post: +# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma +import numpy as np +import fpzip +from decentralizepy.compression.Elias import Elias + + +class EliasFpzip(Elias): + """ + Compression API + + """ + + def __init__(self): + """ + Constructor + """ + + def compress_float(self, arr): + """ + compression function for float arrays + + Parameters + ---------- + arr : np.ndarray + Data to compress + + Returns + ------- + bytearray + encoded data as bytes + + """ + return fpzip.compress(arr, precision=0, order='C') + + def decompress_float(self, bytes): + """ + decompression function for compressed float arrays + + Parameters + ---------- + bytes :bytearray + compressed data + + Returns + ------- + arr : np.ndarray + decompressed data as array + + """ + return fpzip.decompress(bytes, order='C') \ No newline at end of file -- GitLab From 58801273ab59de37bd9303d04bae13c7efe6c9af Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Fri, 6 May 2022 13:55:42 +0200 Subject: [PATCH 11/17] tcp fixes --- src/decentralizepy/communication/TCP.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 3826fad..8d23204 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -88,7 +88,7 @@ class TCP(Communication): self.router.bind(self.addr(rank, machine_id)) self.sent_disconnections = False self.compress = compress - self.compressor = Elias() + self.compressor = Lz4Wrapper() self.total_data = 0 self.total_meta = 0 @@ -131,8 +131,12 @@ class TCP(Communication): self.total_data += (len(output) - len(data["indices"])) else: output = pickle.dumps(data) - if type(data) == dict and "indices" in data: # otherwise it is a centralized testing message - meta_len = len(pickle.dumps(data["indices"])) + # centralized testing uses its own instance + if type(data) == dict: + if "indices" in data: + meta_len = len(pickle.dumps(data["indices"])) + else: + meta_len = 0 self.total_meta += meta_len self.total_data += (len(output) - meta_len) return output -- GitLab From c7ddc1f555cc47bcbf2805e8a4bda438daf9df1d Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Fri, 6 May 2022 14:46:32 +0200 Subject: [PATCH 12/17] sorting indexes retuned by topk --- src/decentralizepy/communication/TCP.py | 2 +- src/decentralizepy/sharing/FFT.py | 2 +- src/decentralizepy/sharing/PartialModel.py | 6 ++++-- src/decentralizepy/sharing/Wavelet.py | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 8d23204..3389e20 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -88,7 +88,7 @@ class TCP(Communication): self.router.bind(self.addr(rank, machine_id)) self.sent_disconnections = False self.compress = compress - self.compressor = Lz4Wrapper() + self.compressor = Elias() self.total_data = 0 self.total_meta = 0 diff --git a/src/decentralizepy/sharing/FFT.py b/src/decentralizepy/sharing/FFT.py index cef3873..ba7b841 100644 --- a/src/decentralizepy/sharing/FFT.py +++ b/src/decentralizepy/sharing/FFT.py @@ -142,7 +142,7 @@ class FFT(PartialModel): dim=0, sorted=False, ) - + index, _ = torch.sort(index) return flat_fft[index], index def serialized_model(self): diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index daf9b8d..89f4418 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -147,10 +147,12 @@ class PartialModel(Sharing): std, mean = torch.std_mean(G_topk, unbiased=False) self.std = std.item() self.mean = mean.item() - return torch.topk( - G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False + _, index = torch.topk( + G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=True ) + index, _ = torch.sort(index) + def serialized_model(self): """ Convert model to a dict. self.alpha specifies the fraction of model to send. diff --git a/src/decentralizepy/sharing/Wavelet.py b/src/decentralizepy/sharing/Wavelet.py index 516fa05..b864f1f 100644 --- a/src/decentralizepy/sharing/Wavelet.py +++ b/src/decentralizepy/sharing/Wavelet.py @@ -164,7 +164,7 @@ class Wavelet(PartialModel): dim=0, sorted=False, ) - + index, _ = torch.sort(index) return data[index], index def serialized_model(self): -- GitLab From 818080477843b19de93a7d8a1b3dabd275d426f0 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Fri, 6 May 2022 14:51:10 +0200 Subject: [PATCH 13/17] partial model fix --- src/decentralizepy/sharing/PartialModel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index 89f4418..a59b669 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -152,6 +152,7 @@ class PartialModel(Sharing): ) index, _ = torch.sort(index) + return _, index def serialized_model(self): """ -- GitLab From 524a4494f54df689b68ad774bfd49649d3e7f362 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Sat, 7 May 2022 12:34:44 +0200 Subject: [PATCH 14/17] meta data statistics fix --- src/decentralizepy/communication/TCP.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 3389e20..bbacfc8 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -123,12 +123,13 @@ class TCP(Communication): if self.compress: if "indices" in data: data["indices"] = self.compressor.compress(data["indices"]) + meta_len = len(pickle.dumps(data["indices"])) # ONLY necessary for the statistics if "params" in data: data["params"] = self.compressor.compress_float(data["params"]) output = pickle.dumps(data) # the compressed meta data gets only a few bytes smaller after pickling - self.total_meta += len(data["indices"]) - self.total_data += (len(output) - len(data["indices"])) + self.total_meta += meta_len + self.total_data += (len(output) - meta_len) else: output = pickle.dumps(data) # centralized testing uses its own instance -- GitLab From 1750a5e4ebcce92a3789f024c7dfdc949ee8240b Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Sat, 7 May 2022 21:29:21 +0200 Subject: [PATCH 15/17] testing eval call --- src/decentralizepy/train_test_evaluation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/decentralizepy/train_test_evaluation.py b/src/decentralizepy/train_test_evaluation.py index 316319d..319d308 100644 --- a/src/decentralizepy/train_test_evaluation.py +++ b/src/decentralizepy/train_test_evaluation.py @@ -36,6 +36,7 @@ class TrainTestHelper: def train_test_evaluation(self, iteration): with torch.no_grad(): + self.model.eval() total_threads = os.cpu_count() torch.set_num_threads(total_threads) @@ -49,7 +50,7 @@ class TrainTestHelper: clone_val = val.clone().detach() state_dict_copy[key] = clone_val flat = clone_val.flatten() - to_cat.append(clone_val.flatten()) + to_cat.append(flat) lens.append(flat.shape[0]) my_weight = torch.cat(to_cat) @@ -90,4 +91,5 @@ class TrainTestHelper: torch.set_num_threads(self.threads_per_proc) for neighbor in neighbors: self.comm.send(neighbor, "finished") - return ta, tl, trl + self.model.train() + return ta, tl, trl -- GitLab From 5f03ba64140a148d7acb6fdcebc78d847869cdcb Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Mon, 9 May 2022 14:08:06 +0200 Subject: [PATCH 16/17] added config options for the compression files --- src/decentralizepy/communication/TCP.py | 23 ++++++--- src/decentralizepy/compression/Compression.py | 2 +- src/decentralizepy/compression/Elias.py | 5 +- .../compression/EliasFpzipLossy.py | 51 +++++++++++++++++++ src/decentralizepy/compression/Lz4Wrapper.py | 5 +- 5 files changed, 76 insertions(+), 10 deletions(-) create mode 100644 src/decentralizepy/compression/EliasFpzipLossy.py diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index bbacfc8..b00681a 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -1,17 +1,13 @@ import json import logging -import lzma import pickle +import importlib + from collections import deque import zmq -import lz4.frame -from pathlib import Path -import os from decentralizepy.communication.Communication import Communication -from decentralizepy.compression.Lz4Wrapper import Lz4Wrapper -from decentralizepy.compression.Elias import Elias HELLO = b"HELLO" BYE = b"BYE" @@ -52,6 +48,8 @@ class TCP(Communication): addresses_filepath, compress=False, offset=20000, + compression_package=None, + compression_class=None, ): """ Constructor @@ -68,6 +66,10 @@ class TCP(Communication): Total number of processes addresses_filepath : str JSON file with machine_id -> ip mapping + compression_package : str + Import path of a module that implements the compression.Compression.Compression class + compression_class : str + Name of the compression class inside the compression package """ super().__init__(rank, machine_id, mapping, total_procs) @@ -88,7 +90,14 @@ class TCP(Communication): self.router.bind(self.addr(rank, machine_id)) self.sent_disconnections = False self.compress = compress - self.compressor = Elias() + + if compression_package and compression_class: + compressor_module = importlib.import_module(compression_package) + compressor_class = getattr(compressor_module, compression_class) + self.compressor = compressor_class() + logging.info(f"Using the {compressor_class} to compress the data") + else: + assert not self.compress self.total_data = 0 self.total_meta = 0 diff --git a/src/decentralizepy/compression/Compression.py b/src/decentralizepy/compression/Compression.py index 253d808..73303dd 100644 --- a/src/decentralizepy/compression/Compression.py +++ b/src/decentralizepy/compression/Compression.py @@ -1,5 +1,5 @@ import numpy as np -class Communication: +class Compression: """ Compression API diff --git a/src/decentralizepy/compression/Elias.py b/src/decentralizepy/compression/Elias.py index dfdeee5..2f6b3ea 100644 --- a/src/decentralizepy/compression/Elias.py +++ b/src/decentralizepy/compression/Elias.py @@ -2,7 +2,10 @@ # https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma import numpy as np import fpzip -class Elias: +from decentralizepy.compression.Compression import Compression + + +class Elias(Compression): """ Compression API diff --git a/src/decentralizepy/compression/EliasFpzipLossy.py b/src/decentralizepy/compression/EliasFpzipLossy.py new file mode 100644 index 0000000..4d04db5 --- /dev/null +++ b/src/decentralizepy/compression/EliasFpzipLossy.py @@ -0,0 +1,51 @@ +# elias implementation: taken from this stack overflow post: +# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma +import numpy as np +import fpzip +from decentralizepy.compression.Elias import Elias + + +class EliasFpzipLossy(Elias): + """ + Compression API + + """ + + def __init__(self): + """ + Constructor + """ + + def compress_float(self, arr): + """ + compression function for float arrays + + Parameters + ---------- + arr : np.ndarray + Data to compress + + Returns + ------- + bytearray + encoded data as bytes + + """ + return fpzip.compress(arr, precision=18, order='C') + + def decompress_float(self, bytes): + """ + decompression function for compressed float arrays + + Parameters + ---------- + bytes :bytearray + compressed data + + Returns + ------- + arr : np.ndarray + decompressed data as array + + """ + return fpzip.decompress(bytes, order='C') \ No newline at end of file diff --git a/src/decentralizepy/compression/Lz4Wrapper.py b/src/decentralizepy/compression/Lz4Wrapper.py index 136fea0..6c9c5d2 100644 --- a/src/decentralizepy/compression/Lz4Wrapper.py +++ b/src/decentralizepy/compression/Lz4Wrapper.py @@ -1,6 +1,9 @@ import numpy as np import lz4.frame -class Lz4Wrapper: +from decentralizepy.compression.Compression import Compression + + +class Lz4Wrapper(Compression): """ Compression API -- GitLab From 588b109f16be31b3c4771bf69e270aef26091471 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger Date: Mon, 9 May 2022 14:10:29 +0200 Subject: [PATCH 17/17] reformatting --- src/decentralizepy/communication/TCP.py | 15 ++++++++------- src/decentralizepy/compression/Compression.py | 5 +++-- src/decentralizepy/compression/Elias.py | 18 +++++++++--------- src/decentralizepy/compression/EliasFpzip.py | 7 ++++--- .../compression/EliasFpzipLossy.py | 7 ++++--- src/decentralizepy/compression/Lz4Wrapper.py | 7 ++++--- src/decentralizepy/node/Node.py | 4 +++- src/decentralizepy/sharing/PartialModel.py | 2 +- src/decentralizepy/sharing/Sharing.py | 2 +- 9 files changed, 37 insertions(+), 30 deletions(-) diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index b00681a..c609699 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -1,13 +1,13 @@ +import importlib import json import logging import pickle -import importlib - from collections import deque import zmq from decentralizepy.communication.Communication import Communication + HELLO = b"HELLO" BYE = b"BYE" @@ -102,7 +102,6 @@ class TCP(Communication): self.total_data = 0 self.total_meta = 0 - self.peer_deque = deque() self.peer_sockets = dict() self.barrier = set() @@ -132,13 +131,15 @@ class TCP(Communication): if self.compress: if "indices" in data: data["indices"] = self.compressor.compress(data["indices"]) - meta_len = len(pickle.dumps(data["indices"])) # ONLY necessary for the statistics + meta_len = len( + pickle.dumps(data["indices"]) + ) # ONLY necessary for the statistics if "params" in data: data["params"] = self.compressor.compress_float(data["params"]) output = pickle.dumps(data) # the compressed meta data gets only a few bytes smaller after pickling self.total_meta += meta_len - self.total_data += (len(output) - meta_len) + self.total_data += len(output) - meta_len else: output = pickle.dumps(data) # centralized testing uses its own instance @@ -148,7 +149,7 @@ class TCP(Communication): else: meta_len = 0 self.total_meta += meta_len - self.total_data += (len(output) - meta_len) + self.total_data += len(output) - meta_len return output def decrypt(self, sender, data): @@ -262,7 +263,7 @@ class TCP(Communication): logging.debug("Received message from {}".format(sender)) return self.decrypt(sender, recv) - def send(self, uid, data, encrypt = True): + def send(self, uid, data, encrypt=True): """ Send a message to a process. diff --git a/src/decentralizepy/compression/Compression.py b/src/decentralizepy/compression/Compression.py index 73303dd..0924caf 100644 --- a/src/decentralizepy/compression/Compression.py +++ b/src/decentralizepy/compression/Compression.py @@ -1,4 +1,6 @@ import numpy as np + + class Compression: """ Compression API @@ -44,7 +46,6 @@ class Compression: """ raise NotImplementedError - def compress_float(self, arr): """ compression function for float arrays @@ -77,4 +78,4 @@ class Compression: decompressed data as array """ - raise NotImplementedError \ No newline at end of file + raise NotImplementedError diff --git a/src/decentralizepy/compression/Elias.py b/src/decentralizepy/compression/Elias.py index 2f6b3ea..235cf00 100644 --- a/src/decentralizepy/compression/Elias.py +++ b/src/decentralizepy/compression/Elias.py @@ -1,7 +1,8 @@ # elias implementation: taken from this stack overflow post: # https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma -import numpy as np import fpzip +import numpy as np + from decentralizepy.compression.Compression import Compression @@ -34,19 +35,19 @@ class Elias(Compression): arr.sort() first = arr[0] arr = np.diff(arr).astype(np.int32) - arr = arr.view(f'u{arr.itemsize}') - l = np.log2(arr).astype('u1') + arr = arr.view(f"u{arr.itemsize}") + l = np.log2(arr).astype("u1") L = ((l << 1) + 1).cumsum() - out = np.zeros(int(L[-1] + 128), 'u1') + out = np.zeros(int(L[-1] + 128), "u1") for i in range(l.max() + 1): out[L - i - 1] += (arr >> i) & 1 s = np.array([out.size], dtype=np.int64) - size = np.ndarray(8, dtype='u1', buffer=s.data) + size = np.ndarray(8, dtype="u1", buffer=s.data) packed = np.packbits(out) packed[-8:] = size s = np.array([first], dtype=np.int64) - size = np.ndarray(8, dtype='u1', buffer=s.data) + size = np.ndarray(8, dtype="u1", buffer=s.data) packed[-16:-8] = size return packed @@ -67,7 +68,7 @@ class Elias(Compression): """ n_arr = bytes[-8:] n = np.ndarray(1, dtype=np.int64, buffer=n_arr.data)[0] - first = bytes[-16: -8] + first = bytes[-16:-8] first = np.ndarray(1, dtype=np.int64, buffer=first.data)[0] b = bytes[:-16] b = np.unpackbits(b, count=n).view(bool) @@ -95,7 +96,6 @@ class Elias(Compression): out = np.cumsum(out_fin) return out - def compress_float(self, arr): """ compression function for float arrays @@ -128,4 +128,4 @@ class Elias(Compression): decompressed data as array """ - return bytes \ No newline at end of file + return bytes diff --git a/src/decentralizepy/compression/EliasFpzip.py b/src/decentralizepy/compression/EliasFpzip.py index 42eb352..dc1413a 100644 --- a/src/decentralizepy/compression/EliasFpzip.py +++ b/src/decentralizepy/compression/EliasFpzip.py @@ -1,7 +1,8 @@ # elias implementation: taken from this stack overflow post: # https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma -import numpy as np import fpzip +import numpy as np + from decentralizepy.compression.Elias import Elias @@ -31,7 +32,7 @@ class EliasFpzip(Elias): encoded data as bytes """ - return fpzip.compress(arr, precision=0, order='C') + return fpzip.compress(arr, precision=0, order="C") def decompress_float(self, bytes): """ @@ -48,4 +49,4 @@ class EliasFpzip(Elias): decompressed data as array """ - return fpzip.decompress(bytes, order='C') \ No newline at end of file + return fpzip.decompress(bytes, order="C") diff --git a/src/decentralizepy/compression/EliasFpzipLossy.py b/src/decentralizepy/compression/EliasFpzipLossy.py index 4d04db5..30e0111 100644 --- a/src/decentralizepy/compression/EliasFpzipLossy.py +++ b/src/decentralizepy/compression/EliasFpzipLossy.py @@ -1,7 +1,8 @@ # elias implementation: taken from this stack overflow post: # https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma -import numpy as np import fpzip +import numpy as np + from decentralizepy.compression.Elias import Elias @@ -31,7 +32,7 @@ class EliasFpzipLossy(Elias): encoded data as bytes """ - return fpzip.compress(arr, precision=18, order='C') + return fpzip.compress(arr, precision=18, order="C") def decompress_float(self, bytes): """ @@ -48,4 +49,4 @@ class EliasFpzipLossy(Elias): decompressed data as array """ - return fpzip.decompress(bytes, order='C') \ No newline at end of file + return fpzip.decompress(bytes, order="C") diff --git a/src/decentralizepy/compression/Lz4Wrapper.py b/src/decentralizepy/compression/Lz4Wrapper.py index 6c9c5d2..cb65773 100644 --- a/src/decentralizepy/compression/Lz4Wrapper.py +++ b/src/decentralizepy/compression/Lz4Wrapper.py @@ -1,5 +1,6 @@ -import numpy as np import lz4.frame +import numpy as np + from decentralizepy.compression.Compression import Compression @@ -9,7 +10,7 @@ class Lz4Wrapper(Compression): """ - def __init__(self, compress_metadata = True, compress_data = False): + def __init__(self, compress_metadata=True, compress_data=False): """ Constructor """ @@ -96,4 +97,4 @@ class Lz4Wrapper(Compression): if self.compress_data: decomp = lz4.frame.decompress(bytes) return np.frombuffer(decomp, dtype=np.float32) - return bytes \ No newline at end of file + return bytes diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py index 4c93a00..dd1b9d9 100644 --- a/src/decentralizepy/node/Node.py +++ b/src/decentralizepy/node/Node.py @@ -434,7 +434,9 @@ class Node: results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes if hasattr(self.communication, "total_meta"): - results_dict["total_meta"][iteration + 1] = self.communication.total_meta + results_dict["total_meta"][ + iteration + 1 + ] = self.communication.total_meta if hasattr(self.communication, "total_data"): results_dict["total_data_per_n"][ iteration + 1 diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index a59b669..1f1feca 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -147,7 +147,7 @@ class PartialModel(Sharing): std, mean = torch.std_mean(G_topk, unbiased=False) self.std = std.item() self.mean = mean.item() - _, index = torch.topk( + _, index = torch.topk( G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=True ) diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py index 0e77008..22b4de9 100644 --- a/src/decentralizepy/sharing/Sharing.py +++ b/src/decentralizepy/sharing/Sharing.py @@ -179,7 +179,7 @@ class Sharing: data["iteration"] = self.communication_round encrypted = self.communication.encrypt(data) for neighbor in iter_neighbors: - self.communication.send(neighbor, encrypted, encrypt = False) + self.communication.send(neighbor, encrypted, encrypt=False) logging.info("Waiting for messages from neighbors") while not self.received_from_all(): -- GitLab