forked from ashleve/lightning-hydra-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmnist_datamodule.py
147 lines (121 loc) · 5.57 KB
/
mnist_datamodule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from typing import Optional, Tuple
from sklearn.model_selection import KFold, StratifiedKFold
import torch
from pytorch_lightning import LightningDataModule
from torch.utils.data import ConcatDataset, DataLoader, Dataset, random_split, Subset
from torchvision.datasets import MNIST
from torchvision.transforms import transforms
class MNISTDataModule(LightningDataModule):
"""
Example of LightningDataModule for MNIST dataset.
A DataModule implements 5 key methods:
- prepare_data (things to do on 1 GPU/TPU, not on every GPU/TPU in distributed mode)
- setup (things to do on every accelerator in distributed mode)
- train_dataloader (the training dataloader)
- val_dataloader (the validation dataloader(s))
- test_dataloader (the test dataloader(s))
This allows you to share a full dataset without explaining how to download,
split, transform and process the data.
Read the docs:
https://pytorch-lightning.readthedocs.io/en/latest/extensions/datamodules.html
"""
def __init__(
self,
data_dir: str = "data/",
train_val_test_split: Tuple[int, int, int] = (55_000, 5_000, 10_000),
batch_size: int = 64,
num_workers: int = 0,
pin_memory: bool = False,
stratify: bool = False,
n_splits: int = 5,
):
super().__init__()
# this line allows to access init params with 'self.hparams' attribute
# it also ensures init params will be stored in ckpt
self.save_hyperparameters(logger=False)
# data transformations
self.transforms = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
# self.dims is returned when you call datamodule.size()
self.dims = (1, 28, 28)
self.data_train: Optional[Dataset] = None
self.data_val: Optional[Dataset] = None
self.data_test: Optional[Dataset] = None
@property
def num_classes(self) -> int:
return 10
def prepare_data(self):
"""Download data if needed. This method is called only from a single GPU.
Do not use it to assign state (self.x = y)."""
MNIST(self.hparams.data_dir, train=True, download=True)
MNIST(self.hparams.data_dir, train=False, download=True)
def setup(self, stage: Optional[str] = None):
"""Load data. Set variables: `self.data_train`, `self.data_val`, `self.data_test`.
This method is called by lightning twice for `trainer.fit()` and `trainer.test()`, so be careful if you do a random split!
The `stage` can be used to differentiate whether it's called before trainer.fit()` or `trainer.test()`."""
# load datasets only if they're not loaded already
if not self.data_train and not self.data_val and not self.data_test:
trainset = MNIST(self.hparams.data_dir, train=True, transform=self.transforms)
testset = MNIST(self.hparams.data_dir, train=False, transform=self.transforms)
dataset = ConcatDataset(datasets=[trainset, testset])
self.data_train, self.data_val, self.data_test = random_split(
dataset=dataset,
lengths=self.hparams.train_val_test_split,
generator=torch.Generator().manual_seed(42),
)
def train_dataloader(self):
return DataLoader(
dataset=self.data_train,
batch_size=self.hparams.batch_size,
num_workers=self.hparams.num_workers,
pin_memory=self.hparams.pin_memory,
shuffle=True,
)
def val_dataloader(self):
return DataLoader(
dataset=self.data_val,
batch_size=self.hparams.batch_size,
num_workers=self.hparams.num_workers,
pin_memory=self.hparams.pin_memory,
shuffle=False,
)
def test_dataloader(self):
return DataLoader(
dataset=self.data_test,
batch_size=self.hparams.batch_size,
num_workers=self.hparams.num_workers,
pin_memory=self.hparams.pin_memory,
shuffle=False,
)
def get_splits(self):
self.prepare_data()
self.setup()
if self.hparams.stratify:
labels = self.get_data_labels()
cv_ = StratifiedKFold(n_splits=self.hparams.n_splits)
else:
labels = None
cv_ = KFold(n_splits=self.hparams.n_splits)
dataset = self.get_dataset()
n_samples = len(dataset)
for train_idx, val_idx in cv_.split(X=range(n_samples), y=labels):
_train = Subset(dataset, train_idx)
train_loader = DataLoader(dataset=_train,
batch_size=self.hparams.batch_size,
shuffle=True,
num_workers=self.hparams.num_workers,
pin_memory=self.hparams.pin_memory)
_val = Subset(dataset, val_idx)
val_loader = DataLoader(dataset=_val,
batch_size=self.hparams.batch_size,
shuffle=False,
num_workers=self.hparams.num_workers,
pin_memory=self.hparams.pin_memory)
yield train_loader, val_loader
def get_dataset(self):
"""Creates and returns the complete dataset."""
return ConcatDataset([self.data_train, self.data_val])
def get_data_labels(self):
dataset = self.get_dataset()
return [int(sample[1]) for sample in dataset]