import logging import os import numpy as np import torch import torch.nn.functional as F import torchvision import torchvision.transforms as transforms from torch import nn from torch.utils.data import DataLoader from decentralizepy.datasets.Data import Data from decentralizepy.datasets.Dataset import Dataset from decentralizepy.datasets.Partitioner import DataPartitioner, SimpleDataPartitioner from decentralizepy.mappings.Mapping import Mapping from decentralizepy.models.Model import Model NUM_CLASSES = 10 class CIFAR10(Dataset): """ Class for the FEMNIST dataset """ def load_trainset(self): """ Loads the training set. Partitions it if needed. """ logging.info("Loading training set.") trainset = torchvision.datasets.CIFAR10( root=self.train_dir, train=True, download=True, transform=self.transform ) c_len = len(trainset) if self.sizes == None: # Equal distribution of data among processes e = c_len // self.n_procs frac = e / c_len self.sizes = [frac] * self.n_procs self.sizes[-1] += 1.0 - frac * self.n_procs logging.debug("Size fractions: {}".format(self.sizes)) self.uid = self.mapping.get_uid(self.rank, self.machine_id) if not self.partition_niid: self.trainset = DataPartitioner(trainset, self.sizes).use(self.uid) else: train_data = {key: [] for key in range(10)} for x, y in trainset: train_data[y].append(x) all_trainset = [] for y, x in train_data.items(): all_trainset.extend([(a, y) for a in x]) self.trainset = SimpleDataPartitioner(all_trainset, self.sizes).use( self.uid ) def load_testset(self): """ Loads the testing set. """ logging.info("Loading testing set.") self.testset = torchvision.datasets.CIFAR10( root=self.test_dir, train=False, download=True, transform=self.transform ) def __init__( self, rank: int, machine_id: int, mapping: Mapping, n_procs="", train_dir="", test_dir="", sizes="", test_batch_size=1024, partition_niid=False, ): """ Constructor which reads the data files, instantiates and partitions the dataset Parameters ---------- rank : int Rank of the current process (to get the partition). machine_id : int Machine ID mapping : decentralizepy.mappings.Mapping Mapping to convert rank, machine_id -> uid for data partitioning It also provides the total number of global processes train_dir : str, optional Path to the training data files. Required to instantiate the training set The training set is partitioned according to the number of global processes and sizes test_dir : str. optional Path to the testing data files Required to instantiate the testing set sizes : list(int), optional A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 By default, each process gets an equal amount. test_batch_size : int, optional Batch size during testing. Default value is 64 partition_niid: bool, optional When True, partitions dataset in a non-iid way """ super().__init__( rank, machine_id, mapping, train_dir, test_dir, sizes, test_batch_size, ) self.partition_niid = partition_niid self.transform = transforms.Compose( [ transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), ] ) if self.__training__: self.load_trainset() if self.__testing__: self.load_testset() # TODO: Add Validation def get_trainset(self, batch_size=1, shuffle=False): """ Function to get the training set Parameters ---------- batch_size : int, optional Batch size for learning Returns ------- torch.utils.Dataset(decentralizepy.datasets.Data) Raises ------ RuntimeError If the training set was not initialized """ if self.__training__: return DataLoader(self.trainset, batch_size=batch_size, shuffle=shuffle) raise RuntimeError("Training set not initialized!") def get_testset(self): """ Function to get the test set Returns ------- torch.utils.Dataset(decentralizepy.datasets.Data) Raises ------ RuntimeError If the test set was not initialized """ if self.__testing__: return DataLoader(self.testset, batch_size=self.test_batch_size) raise RuntimeError("Test set not initialized!") def test(self, model, loss): """ Function to evaluate model on the test dataset. Parameters ---------- model : decentralizepy.models.Model Model to evaluate loss : torch.nn.loss Loss function to evaluate Returns ------- tuple (accuracy, loss_value) """ testloader = self.get_testset() logging.debug("Test Loader instantiated.") correct_pred = [0 for _ in range(NUM_CLASSES)] total_pred = [0 for _ in range(NUM_CLASSES)] total_correct = 0 total_predicted = 0 with torch.no_grad(): loss_val = 0.0 count = 0 for elems, labels in testloader: outputs = model(elems) loss_val += loss(outputs, labels).item() count += 1 _, predictions = torch.max(outputs, 1) for label, prediction in zip(labels, predictions): logging.debug("{} predicted as {}".format(label, prediction)) if label == prediction: correct_pred[label] += 1 total_correct += 1 total_pred[label] += 1 total_predicted += 1 logging.debug("Predicted on the test set") for key, value in enumerate(correct_pred): if total_pred[key] != 0: accuracy = 100 * float(value) / total_pred[key] else: accuracy = 100.0 logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy)) accuracy = 100 * float(total_correct) / total_predicted loss_val = loss_val / count logging.info("Overall accuracy is: {:.1f} %".format(accuracy)) return accuracy, loss_val class CNN(Model): """ Class for a CNN Model for CIFAR10 """ def __init__(self): """ Constructor. Instantiates the CNN Model with 10 output classes """ super().__init__() # 1.6 million params self.conv1 = nn.Conv2d(3, 6, 5) self.pool = nn.MaxPool2d(2, 2) self.conv2 = nn.Conv2d(6, 16, 5) self.fc1 = nn.Linear(16 * 5 * 5, 120) self.fc2 = nn.Linear(120, 84) self.fc3 = nn.Linear(84, NUM_CLASSES) def forward(self, x): """ Forward pass of the model Parameters ---------- x : torch.tensor The input torch tensor Returns ------- torch.tensor The output torch tensor """ x = self.pool(F.relu(self.conv1(x))) x = self.pool(F.relu(self.conv2(x))) x = torch.flatten(x, 1) x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.fc3(x) return x