Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import json
import logging
import math
import random
import torch
from decentralizepy.sharing.Sharing import Sharing
class RoundRobinPartial(Sharing):
"""
This class implements the Round robin partial model sharing.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet.
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
"""
super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir
)
self.alpha = alpha
random.seed(self.mapping.get_uid(rank, machine_id))
n_params = self.model.count_params()
logging.info("Total number of parameters: {}".format(n_params))
self.block_size = math.ceil(self.alpha * n_params)
logging.info("Block_size: {}".format(self.block_size))
self.num_blocks = math.ceil(n_params / self.block_size)
logging.info("Total number of blocks: {}".format(n_params))
self.current_block = random.randint(0, self.num_blocks - 1)
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
"""
with torch.no_grad():
logging.info("Extracting params to send")
tensors_to_cat = [v.data.flatten() for v in self.model.parameters()]
T = torch.cat(tensors_to_cat, dim=0)
block_start = self.current_block * self.block_size
block_end = min(T.shape[0], (self.current_block + 1) * self.block_size)
self.current_block = (self.current_block + 1) % self.num_blocks
T_send = T[block_start:block_end]
logging.info("Range sending: {}-{}".format(block_start, block_end))
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
logging.info("Generating dictionary to send")
m = dict()
m["block_start"] = block_start
m["block_end"] = block_end
m["params"] = T_send.numpy().tolist()
logging.info("Elements sending: {}".format(len(m["params"])))
logging.info("Generated dictionary to send")
for key in m:
m[key] = json.dumps(m[key])
logging.info("Converted dictionary to json")
self.total_data += len(self.communication.encrypt(m["params"]))
return m
def deserialized_model(self, m):
"""
Convert received json dict to state_dict.
Parameters
----------
m : dict
json dict received
Returns
-------
state_dict
state_dict of received
"""
with torch.no_grad():
state_dict = self.model.state_dict()
shapes = []
lens = []
tensors_to_cat = []
for _, v in state_dict.items():
shapes.append(v.shape)
t = v.flatten()
lens.append(t.shape[0])
tensors_to_cat.append(t)
T = torch.cat(tensors_to_cat, dim=0)
block_start = json.loads(m["block_start"])
block_end = json.loads(m["block_end"])
T[block_start:block_end] = torch.tensor(json.loads(m["params"]))
start_index = 0
for i, key in enumerate(state_dict):
end_index = start_index + lens[i]
state_dict[key] = T[start_index:end_index].reshape(shapes[i])
start_index = end_index
return state_dict