From 7c25fbfe513ddc70b5409334014b6c8e34dccea3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bojan=20Karla=C5=A1?= Date: Mon, 11 Dec 2023 20:23:53 -0500 Subject: [PATCH] Introduce caching of pipeline outputs over entire datasets. --- experiments/datascope/experiments/base.py | 12 + .../experiments/datasets/__init__.py | 10 + .../datascope/experiments/datasets/base.py | 318 +++++++++++++++--- experiments/datascope/experiments/main.py | 42 ++- .../datascope/experiments/pipelines/base.py | 3 + .../experiments/scenarios/data_discard.py | 9 +- .../scenarios/datascope_scenario.py | 15 + .../experiments/scenarios/label_repair.py | 14 +- experiments/requirements.txt | 1 + 9 files changed, 365 insertions(+), 59 deletions(-) diff --git a/experiments/datascope/experiments/base.py b/experiments/datascope/experiments/base.py index bb15005..746f465 100644 --- a/experiments/datascope/experiments/base.py +++ b/experiments/datascope/experiments/base.py @@ -11,6 +11,9 @@ from tqdm import tqdm from typing import Any, Optional, Sequence, Tuple +from .datasets import DEFAULT_BATCH_SIZE, DEFAULT_CACHE_DIR, Dataset +from .pipelines import Pipeline + from .scenarios import ( Study, Scenario, @@ -218,3 +221,12 @@ def report( with Pool(processes=None, initializer=_init_pool, initargs=(lock,)) as pool: for result in tqdm(pool.imap_unordered(_report_generator, item_args), desc="Reports", total=len(reports)): tqdm.write(result) + + +def cache_pipeline( + dataset: str, pipeline: str, cache_dir: str = DEFAULT_CACHE_DIR, batch_size: int = DEFAULT_BATCH_SIZE, **kwargs +) -> None: + cls_dataset = Dataset.datasets[dataset] + inst_dataset = cls_dataset() + inst_pipeline = Pipeline.pipelines[pipeline].construct(dataset=inst_dataset) + cls_dataset.construct_apply_cache(pipeline=inst_pipeline, cache_dir=cache_dir, batch_size=batch_size) diff --git a/experiments/datascope/experiments/datasets/__init__.py b/experiments/datascope/experiments/datasets/__init__.py index cb029b1..1bbad9b 100644 --- a/experiments/datascope/experiments/datasets/__init__.py +++ b/experiments/datascope/experiments/datasets/__init__.py @@ -24,7 +24,12 @@ DEFAULT_BIAS_METHOD, KEYWORD_REPLACEMENTS, DEFAULT_AUGMENT_FACTOR, + DEFAULT_CACHE_DIR, + DEFAULT_BATCH_SIZE, preload_datasets, + batched_pipeline_transform, + save_cached_features, + load_cached_features, ) __all__ = [ @@ -54,5 +59,10 @@ "DEFAULT_BIAS_METHOD", "KEYWORD_REPLACEMENTS", "DEFAULT_AUGMENT_FACTOR", + "DEFAULT_CACHE_DIR", + "DEFAULT_BATCH_SIZE", "preload_datasets", + "batched_pipeline_transform", + "save_cached_features", + "load_cached_features", ] diff --git a/experiments/datascope/experiments/datasets/base.py b/experiments/datascope/experiments/datasets/base.py index e3cecff..c7d4d23 100644 --- a/experiments/datascope/experiments/datasets/base.py +++ b/experiments/datascope/experiments/datasets/base.py @@ -11,6 +11,8 @@ import pickle import pyarrow import requests +import shutil +import tables as tb import tarfile import torch @@ -25,14 +27,15 @@ from joblib.hashing import NumpyHasher from math import ceil, floor from numpy import ndarray -from pandas import DataFrame, Series +from pandas import DataFrame, Series, Index from PIL import Image from pyarrow import parquet as pq +from scipy.sparse import issparse, spmatrix from sklearn.datasets import fetch_openml, fetch_20newsgroups, make_classification from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import LabelEncoder -from tqdm import tqdm +from tqdm.auto import tqdm from typing import Dict, List, Optional, Sequence, Tuple, Type, Union, Callable, Hashable from zipfile import ZipFile @@ -67,6 +70,7 @@ class DatasetModality(str, Enum): DEFAULT_CLASSES = [0, 6] DEFAULT_DATA_DIR = os.path.join("var", "data") DEFAULT_CACHE_DIR = os.path.join(DEFAULT_DATA_DIR, "applycache") +DEFAULT_BATCH_SIZE = 1024 def download(url: str, filename: str, chunk_size=1024): @@ -166,6 +170,46 @@ def _apply_pipeline(dataset: "Dataset", pipeline: Pipeline, **kwargs) -> "Datase return dataset +def batched_pipeline_transform( + X: ndarray, pipeline: Pipeline, batch_size: int = DEFAULT_BATCH_SIZE, desc: Optional[str] = None +) -> ndarray: + idx_max = X.shape[0] + X_batches: List[ndarray] = [] + pbar = tqdm(total=idx_max, desc=desc) + for idx_from in range(0, idx_max, batch_size): + idx_to = min(idx_from + batch_size, idx_max) + X_batch = pipeline.transform(X[idx_from:idx_to]) + if issparse(X_batch): + assert isinstance(X_batch, spmatrix) + X_batch = X_batch.todense() + X_batches.append(X_batch) + pbar.update(idx_to - idx_from) + pbar.close() + return np.concatenate(X_batches, axis=0) + + +def save_cached_features(X: ndarray, targetdir: str, name: str) -> str: + shape, sctype = X.shape[1:], X.dtype.str + filename = os.path.join(targetdir, "%s.hdf5" % name) + file = tb.open_file(filename, "w") + table = file.create_table( + file.root, "apply_cache", description={"features": tb.Col.from_sctype(sctype, shape=shape)} + ) + table.append(X) + file.close() + tqdm.write("Transformed features saved as %s" % filename) + return filename + + +def load_cached_features(idx: ndarray, targetdir: str, name: str) -> ndarray: + filename = os.path.join(targetdir, "%s.hdf5" % name) + file = tb.open_file(filename, "r") + table = file.get_node("/apply_cache") + result = np.array(table.read_coordinates(idx, field="features")) + file.close() + return result + + class Dataset(ABC): datasets: Dict[str, Type["Dataset"]] = {} _dataset: Optional[str] = None @@ -183,12 +227,15 @@ def __init__( self._testsize = testsize self._seed = seed self._loaded: bool = False + self._idx_train: Optional[Index] = None self._X_train: Optional[ndarray] = None self._y_train: Optional[ndarray] = None self._metadata_train: Optional[DataFrame] = None + self._idx_val: Optional[Index] = None self._X_val: Optional[ndarray] = None self._y_val: Optional[ndarray] = None self._metadata_val: Optional[DataFrame] = None + self._idx_test: Optional[Index] = None self._X_test: Optional[ndarray] = None self._y_test: Optional[ndarray] = None self._metadata_test: Optional[DataFrame] = None @@ -230,6 +277,10 @@ def valsize(self) -> int: def testsize(self) -> int: return self._testsize + @property + def idx_train(self) -> Optional[Index]: + return self._idx_train + @property def X_train(self) -> ndarray: if self._X_train is None: @@ -256,6 +307,10 @@ def y_train(self, value: ndarray): def metadata_train(self) -> Optional[DataFrame]: return self._metadata_train + @property + def idx_val(self) -> Optional[Index]: + return self._idx_val + @property def X_val(self) -> ndarray: if self._X_val is None: @@ -282,6 +337,10 @@ def y_val(self, value: ndarray): def metadata_val(self) -> Optional[DataFrame]: return self._metadata_val + @property + def idx_test(self) -> Optional[Index]: + return self._idx_test + @property def X_test(self) -> ndarray: if self._X_test is None: @@ -330,8 +389,38 @@ def _construct_provenance(self, groupings: Optional[ndarray] = None) -> None: # provenance = np.expand_dims(groupings, axis=(1, 2, 3)) # self._provenance = np.pad(provenance, pad_width=((0, 0), (0, 0), (0, 0), (0, 1))) - def apply(self, pipeline: Pipeline) -> "Dataset": - return _apply_pipeline(dataset=self, pipeline=pipeline) + @classmethod + @abstractmethod + def construct_apply_cache( + cls: Type["Dataset"], + pipeline: Pipeline, + cache_dir: str = DEFAULT_CACHE_DIR, + batch_size: int = DEFAULT_BATCH_SIZE, + ) -> None: + raise NotImplementedError() + + def query_cached_apply( + self, pipeline: Pipeline, cache_dir: str = DEFAULT_CACHE_DIR, inplace: bool = False + ) -> Optional["Dataset"]: + targetdir = os.path.join(cache_dir, type(self).__name__, str(pipeline)) + if not os.path.isdir(targetdir) or self._idx_train is None or self._idx_val is None or self._idx_test is None: + return None + + result = self if inplace else deepcopy(self) + result._X_train = load_cached_features(self._idx_train.to_numpy(), targetdir=targetdir, name="train") + result._X_val = load_cached_features(self._idx_val.to_numpy(), targetdir=targetdir, name="train") + result._X_test = load_cached_features(self._idx_test.to_numpy(), targetdir=targetdir, name="test") + return result + + def apply(self, pipeline: Pipeline, cache_dir: str = DEFAULT_CACHE_DIR, inplace: bool = False) -> "Dataset": + result = self.query_cached_apply(pipeline=pipeline, cache_dir=cache_dir, inplace=inplace) + if result is None: + result = _apply_pipeline(dataset=self, pipeline=pipeline) + if inplace: + self._X_train = result._X_train + self._X_val = result._X_val + self._X_test = result._X_test + return result def __hash_string__(self) -> str: myclass: Type[Dataset] = type(self) @@ -903,6 +992,8 @@ def preload(cls) -> None: fetch_openml(data_id=1590, as_frame=False, data_home=DEFAULT_DATA_DIR, return_X_y=True) def load(self) -> None: + X: ndarray + y: ndarray X, y = fetch_openml(data_id=1590, as_frame=False, data_home=DEFAULT_DATA_DIR, return_X_y=True) X = np.nan_to_num(X) # TODO: Maybe leave nan values. y = np.array(y == ">50K", dtype=int) @@ -922,12 +1013,17 @@ def load(self) -> None: valsize = self.valsize if self.valsize > 0 else None testsize = self.testsize if self.testsize > 0 else None valtestsize = valsize + testsize if valsize is not None and testsize is not None else None - self._X_train, self._X_val, self._y_train, self._y_val = train_test_split( - X, y, train_size=trainsize, test_size=valtestsize, random_state=self._seed + totsize = X.shape[0] + idx_train, idx_test_val = train_test_split( + np.arange(totsize), train_size=trainsize, test_size=valtestsize, random_state=self._seed ) - self._X_val, self._X_test, self._y_val, self._y_test = train_test_split( - self._X_val, self._y_val, train_size=valsize, test_size=testsize, random_state=self._seed + idx_val, idx_test = train_test_split( + idx_test_val, train_size=valsize, test_size=testsize, random_state=self._seed ) + self._X_train, self._y_train = X[idx_train], y[idx_train] + self._X_val, self._y_val = X[idx_val], y[idx_val] + self._X_test, self._y_test = X[idx_test], y[idx_test] + self._idx_train, self._idx_val, self._idx_test = Index(idx_train), Index(idx_val), Index(idx_test) self._loaded = True assert self._X_train is not None and self._X_val is not None and self._X_test is not None @@ -966,6 +1062,7 @@ def load_biased( self._X_train, self._y_train = X[idx_train], y[idx_train] self._X_val, self._y_val = X[idx_val], y[idx_val] self._X_test, self._y_test = X[idx_test], y[idx_test] + self._idx_train, self._idx_val, self._idx_test = Index(idx_train), Index(idx_val), Index(idx_test) assert self._X_train is not None and self._X_val is not None and self._X_test is not None self._trainsize = self._X_train.shape[0] self._valsize = self._X_val.shape[0] @@ -1090,6 +1187,8 @@ def load(self) -> None: ) states = get_states_for_size(n) acs_data = data_source.get_data(states=states, download=True) + X: ndarray + y: ndarray X, y, _ = ACSIncome.df_to_numpy(acs_data) del acs_data @@ -1097,12 +1196,17 @@ def load(self) -> None: valsize = self.valsize if self.valsize > 0 else None testsize = self.testsize if self.testsize > 0 else None valtestsize = valsize + testsize if valsize is not None and testsize is not None else None - self._X_train, self._X_val, self._y_train, self._y_val = train_test_split( - X, y, train_size=trainsize, test_size=valtestsize, random_state=self._seed + totsize = X.shape[0] + idx_train, idx_test_val = train_test_split( + np.arange(totsize), train_size=trainsize, test_size=valtestsize, random_state=self._seed ) - self._X_val, self._X_test, self._y_val, self._y_test = train_test_split( - self._X_val, self._y_val, train_size=valsize, test_size=testsize, random_state=self._seed + idx_val, idx_test = train_test_split( + idx_test_val, train_size=valsize, test_size=testsize, random_state=self._seed ) + self._X_train, self._y_train = X[idx_train], y[idx_train] + self._X_val, self._y_val = X[idx_val], y[idx_val] + self._X_test, self._y_test = X[idx_test], y[idx_test] + self._idx_train, self._idx_val, self._idx_test = Index(idx_train), Index(idx_val), Index(idx_test) del X, y self._loaded = True @@ -1149,6 +1253,7 @@ def load_biased( self._X_train, self._y_train = X[idx_train], y[idx_train] self._X_val, self._y_val = X[idx_val], y[idx_val] self._X_test, self._y_test = X[idx_test], y[idx_test] + self._idx_train, self._idx_val, self._idx_test = Index(idx_train), Index(idx_val), Index(idx_test) assert self._X_train is not None and self._X_val is not None and self._X_test is not None self._trainsize = self._X_train.shape[0] self._valsize = self._X_val.shape[0] @@ -1205,21 +1310,22 @@ def load(self) -> None: # Select training and test validation examples based on the provided classes. train, test = data["train"], data["test"] assert isinstance(train, datasets.arrow_dataset.Dataset) and isinstance(test, datasets.arrow_dataset.Dataset) - train_val_idx = np.where(np.isin(np.array(train["label"]), self._classes))[0] - test_idx = np.where(np.isin(np.array(test["label"]), self._classes))[0] + idx_train_val = np.where(np.isin(np.array(train["label"]), self._classes))[0] + idx_test = np.where(np.isin(np.array(test["label"]), self._classes))[0] # Produce random samples of the training, validation and test sets based on the provided set sizes. trainsize = self.trainsize if self.trainsize > 0 else None valsize = self.valsize if self.valsize > 0 else None - train_idx, val_idx = train_test_split( - train_val_idx, train_size=trainsize, test_size=valsize, random_state=self._seed + idx_train, idx_val = train_test_split( + idx_train_val, train_size=trainsize, test_size=valsize, random_state=self._seed ) if self.testsize > 0: random = np.random.RandomState(seed=self._seed) - test_idx = random.choice(test_idx, size=self.testsize, replace=False) - train_subset = train.select(train_idx) - val_subset = train.select(val_idx) - test_subset = test.select(test_idx) + idx_test = random.choice(idx_test, size=self.testsize, replace=False) + train_subset = train.select(idx_train) + val_subset = train.select(idx_val) + test_subset = test.select(idx_test) + self._idx_train, self._idx_val, self._idx_test = Index(idx_train), Index(idx_val), Index(idx_test) # Extract features. self._X_train = np.stack(train_subset["image"]) @@ -1240,46 +1346,107 @@ def load(self) -> None: self._testsize = self._X_test.shape[0] self._construct_provenance() + @classmethod + def construct_apply_cache( + cls: Type["Dataset"], + pipeline: Pipeline, + cache_dir: str = DEFAULT_CACHE_DIR, + batch_size: int = DEFAULT_BATCH_SIZE, + ) -> None: + # Determine and create the target directory. If it already exists, remove it first. + targetdir = os.path.join(cache_dir, cls.__name__, str(pipeline)) + if os.path.isdir(targetdir): + shutil.rmtree(targetdir) + os.makedirs(targetdir) + + data = datasets.load_dataset("fashion_mnist", cache_dir=DEFAULT_DATA_DIR) + train, test = data["train"], data["test"] + assert isinstance(train, datasets.arrow_dataset.Dataset) and isinstance(test, datasets.arrow_dataset.Dataset) + X_train, y_train = np.stack(train["image"]), np.array(train["label"], dtype=int) + X_test = np.stack(test["image"]) + pipeline = deepcopy(pipeline) + pipeline.fit(X_train, y_train) + X_train_transformed = batched_pipeline_transform( + X_train, pipeline, batch_size=batch_size, desc="Train data examples" + ) + save_cached_features(X_train_transformed, targetdir=targetdir, name="train") + X_test_transformed = batched_pipeline_transform( + X_test, pipeline, batch_size=batch_size, desc="Test data examples" + ) + save_cached_features(X_test_transformed, targetdir=targetdir, name="test") + class TwentyNewsGroups(NoisyLabelDataset, TextDatasetMixin): @classmethod def preload(cls) -> None: categories = ["comp.graphics", "sci.med"] - fetch_20newsgroups(subset="train", categories=categories, shuffle=True, data_home=DEFAULT_DATA_DIR) - fetch_20newsgroups(subset="test", categories=categories, shuffle=True, data_home=DEFAULT_DATA_DIR) + fetch_20newsgroups(subset="train", categories=categories, shuffle=False, data_home=DEFAULT_DATA_DIR) + fetch_20newsgroups(subset="test", categories=categories, shuffle=False, data_home=DEFAULT_DATA_DIR) def load(self) -> None: categories = ["comp.graphics", "sci.med"] - train = fetch_20newsgroups( - subset="train", categories=categories, shuffle=True, random_state=self._seed, data_home=DEFAULT_DATA_DIR - ) - test = fetch_20newsgroups( - subset="test", categories=categories, shuffle=True, random_state=self._seed, data_home=DEFAULT_DATA_DIR - ) + train = fetch_20newsgroups(subset="train", categories=categories, shuffle=False, data_home=DEFAULT_DATA_DIR) + test = fetch_20newsgroups(subset="test", categories=categories, shuffle=False, data_home=DEFAULT_DATA_DIR) - # Load the train and validaiton data by splitting the original training dataset. - self._X_train, self._y_train = np.array(train.data), np.array(train.target) + # Load the train and validaiton data by splitting the original training dataset. Load the test data. + X, y = np.array(train.data), np.array(train.target) + self._X_test, self._y_test = np.array(test.data), np.array(test.target) trainsize = self.trainsize if self.trainsize > 0 else None valsize = self.valsize if self.valsize > 0 else None - totsize = self._X_train.shape[0] + totsize = X.shape[0] if trainsize is not None and valsize is not None and trainsize + valsize > totsize: valsize = totsize - trainsize - self._X_train, self._X_val, self._y_train, self._y_val = train_test_split( - self._X_train, self._y_train, train_size=trainsize, test_size=valsize, random_state=self._seed + idx_train, idx_val = train_test_split( + np.arange(totsize), train_size=trainsize, test_size=valsize, random_state=self._seed ) + idx_test = np.arange(self._X_test.shape[0]) + if self._testsize > 0: + random = np.random.RandomState(seed=self._seed) + idx_test = random.choice(idx_test, size=self._testsize, replace=False) + self._X_test, self._y_test = self._X_test[idx_test], self._y_test[idx_test] + + self._X_train, self._y_train = X[idx_train], y[idx_train] + self._X_val, self._y_val = X[idx_val], y[idx_val] + self._idx_train, self._idx_val, self._idx_test = Index(idx_train), Index(idx_val), Index(idx_test) - # Load the test data. - self._X_test, self._y_test = np.array(test.data), np.array(test.target) - if self.testsize > 0: - self._X_test, self._y_test = self._X_test[: self.testsize], self._y_test[: self.testsize] self._loaded = True - assert self._X_train is not None and self._X_val is not None + assert self._X_train is not None and self._X_val is not None and self._X_test is not None self._trainsize = self._X_train.shape[0] self._valsize = self._X_val.shape[0] self._testsize = self._X_test.shape[0] self._construct_provenance() + @classmethod + def construct_apply_cache( + cls: Type["Dataset"], + pipeline: Pipeline, + cache_dir: str = DEFAULT_CACHE_DIR, + batch_size: int = DEFAULT_BATCH_SIZE, + ) -> None: + # Determine and create the target directory. If it already exists, remove it first. + targetdir = os.path.join(cache_dir, cls.__name__, str(pipeline)) + if os.path.isdir(targetdir): + shutil.rmtree(targetdir) + os.makedirs(targetdir) + + categories = ["comp.graphics", "sci.med"] + train = fetch_20newsgroups(subset="train", categories=categories, shuffle=False, data_home=DEFAULT_DATA_DIR) + test = fetch_20newsgroups(subset="test", categories=categories, shuffle=False, data_home=DEFAULT_DATA_DIR) + X_train, y_train = np.array(train.data), np.array(train.target) + X_test = np.array(test.data) + + pipeline = deepcopy(pipeline) + pipeline.fit(X_train, y_train) + X_train_transformed = batched_pipeline_transform( + X_train, pipeline, batch_size=batch_size, desc="Train data examples" + ) + save_cached_features(X_train_transformed, targetdir=targetdir, name="train") + X_test_transformed = batched_pipeline_transform( + X_test, pipeline, batch_size=batch_size, desc="Test data examples" + ) + save_cached_features(X_test_transformed, targetdir=targetdir, name="test") + class Higgs(NoisyLabelDataset, TabularDatasetMixin): TRAINSIZES = [1000, 10000, 100000, 1000000] @@ -1336,13 +1503,22 @@ def load(self) -> None: df_ts = pd.read_csv(filename_ts, compression="gzip", header=None) y, X = df_tr.iloc[:, 0].to_numpy(dtype=int), df_tr.iloc[:, 1:].to_numpy() - self._X_train, self._X_val, self._y_train, self._y_val = train_test_split( - X, y, train_size=trainsize, test_size=valsize, random_state=self._seed + totsize = X.shape[0] + idx_train, idx_val = train_test_split( + np.arange(totsize), train_size=trainsize, test_size=valsize, random_state=self._seed ) + self._X_train, self._y_train = X[idx_train], y[idx_train] + self._X_val, self._y_val = X[idx_val], y[idx_val] + self._idx_train, self._idx_val = Index(idx_train), Index(idx_val) - if testsize is not None: - df_ts = df_ts.sample(n=self.testsize, random_state=self._seed) - self._y_test, self._X_test = df_ts.iloc[:, 0].to_numpy(dtype=int), df_ts.iloc[:, 1:].to_numpy() + y_test, X_test = df_ts.iloc[:, 0].to_numpy(dtype=int), df_ts.iloc[:, 1:].to_numpy() + random = np.random.RandomState(seed=self._seed) + idx_test = np.arange(X_test.shape[0]) + if self.testsize > 0: + idx_test = random.permutation(self.testsize) + self._X_test = X_test[idx_test, :] + self._y_test = y_test[idx_test] + self._idx_test = Index(idx_test) assert self._X_train is not None and self._X_val is not None and self._X_test is not None self._loaded = True @@ -1492,6 +1668,9 @@ def load(self) -> None: self._y_train_dirty = np.load(os.path.join(self.DATA_DIR, "y_train_dirty.npy")) self._y_val = np.load(os.path.join(self.DATA_DIR, "y_val.npy")) self._y_test = np.load(os.path.join(self.DATA_DIR, "y_test.npy")) + self._idx_train = Index(np.arange(self.trainsize)) + self._idx_val = Index(np.arange(self.valsize)) + self._idx_test = Index(np.arange(self.testsize)) assert self._X_train is not None assert self._X_val is not None @@ -1508,14 +1687,17 @@ def load(self) -> None: self._X_train = self._X_train[idx, :] self._y_train = self._y_train[idx] self._y_train_dirty = self._y_train_dirty[idx] + self._idx_train = Index(idx) if self.valsize > 0: idx = random.permutation(self.valsize) self._X_val = self._X_val[idx, :] self._y_val = self._y_val[idx] + self._idx_val = Index(idx) if self.testsize > 0: idx = random.permutation(self.testsize) self._X_test = self._X_test[idx, :] self._y_test = self._y_test[idx] + self._idx_test = Index(idx) self._loaded = True assert self._X_train is not None and self._X_val is not None @@ -1573,7 +1755,7 @@ def load(self) -> None: ) y_train = np.concatenate([batch["labels"] for batch in train_batches]) X_test = test_batch["data"].reshape((-1, 3, 32, 32)).transpose((0, 2, 3, 1)) - y_test = test_batch["labels"] + y_test = np.array(test_batch["labels"]) # Load the CIFAR-N noisy labels. noisylabels = torch.load(os.path.join(self.CIFAR_N_DATA_DIR, self.CIFAR_10N_FILE)) @@ -1583,23 +1765,28 @@ def load(self) -> None: trainsize = self.trainsize if self.trainsize > 0 else None valsize = self.valsize if self.valsize > 0 else None totsize = X_train.shape[0] - self._X_train, self._X_val, idx_train, idx_val = train_test_split( - X_train, + idx_train, idx_val = train_test_split( np.arange(totsize), train_size=trainsize, test_size=valsize, random_state=self._seed, stratify=y_train, ) + self._X_train = X_train[idx_train] + self._X_val = X_train[idx_val] self._y_train = y_train[idx_train] self._y_train_dirty = y_train_dirty[idx_train] self._y_val = y_train[idx_val] + self._idx_train, self._idx_val = Index(idx_train), Index(idx_val) # Select the test dataset. - self._X_test, self._y_test = X_test, y_test + random = np.random.RandomState(seed=self._seed) + idx_test = np.arange(X_test.shape[0]) if self.testsize > 0: - assert self._X_test is not None and self._y_test is not None - self._X_test, self._y_test = self._X_test[: self.testsize], self._y_test[: self.testsize] + idx_test = random.permutation(self.testsize) + self._X_test = X_test[idx_test, :] + self._y_test = y_test[idx_test] + self._idx_test = Index(idx_test) assert self._X_train is not None and self._X_val is not None and self._X_test is not None self._trainsize = self._X_train.shape[0] @@ -1608,6 +1795,39 @@ def load(self) -> None: self._loaded = True self._construct_provenance() + @classmethod + def construct_apply_cache( + cls: Type["CifarN"], + pipeline: Pipeline, + cache_dir: str = DEFAULT_CACHE_DIR, + batch_size: int = DEFAULT_BATCH_SIZE, + ) -> None: + # Determine and create the target directory. If it already exists, remove it first. + targetdir = os.path.join(cache_dir, cls.__name__, str(pipeline)) + if os.path.isdir(targetdir): + shutil.rmtree(targetdir) + os.makedirs(targetdir) + + train_filepaths = [os.path.join(cls.CIFAR_10_DATA_DIR, batch) for batch in cls.CIFAR_10_TRAIN_FILES] + train_batches = [unpickle(filepath) for filepath in train_filepaths] + test_batch = unpickle(os.path.join(cls.CIFAR_10_DATA_DIR, cls.CIFAR_10_TEST_FILE)) + X_train = np.concatenate( + [batch["data"].reshape((-1, 3, 32, 32)).transpose((0, 2, 3, 1)) for batch in train_batches] + ) + y_train = np.concatenate([batch["labels"] for batch in train_batches]) + X_test = test_batch["data"].reshape((-1, 3, 32, 32)).transpose((0, 2, 3, 1)) + + pipeline = deepcopy(pipeline) + pipeline.fit(X_train, y_train) + X_train_transformed = batched_pipeline_transform( + X_train, pipeline, batch_size=batch_size, desc="Train data examples" + ) + save_cached_features(X_train_transformed, targetdir=targetdir, name="train") + X_test_transformed = batched_pipeline_transform( + X_test, pipeline, batch_size=batch_size, desc="Test data examples" + ) + save_cached_features(X_test_transformed, targetdir=targetdir, name="test") + def preload_datasets(**kwargs) -> None: for cls in tqdm(Dataset.datasets.values()): diff --git a/experiments/datascope/experiments/main.py b/experiments/datascope/experiments/main.py index b1c2472..80a9f78 100644 --- a/experiments/datascope/experiments/main.py +++ b/experiments/datascope/experiments/main.py @@ -5,8 +5,9 @@ from typing import Any, Optional -from .base import run, run_scenario, report -from .datasets import preload_datasets +from .base import run, run_scenario, report, cache_pipeline +from .datasets import preload_datasets, DEFAULT_BATCH_SIZE, DEFAULT_CACHE_DIR, Dataset +from .pipelines import Pipeline from .scenarios import ( Scenario, Report, @@ -265,8 +266,39 @@ def main(): single_instance=False, ) - subparsers.add_parser("dataload") + subparsers.add_parser("preload-datasets") + parser_cache_pipeline = subparsers.add_parser("cache-pipeline") + + parser_cache_pipeline.add_argument( + "-d", + "--dataset", + type=str, + choices=Dataset.datasets.keys(), + help="The target dataset for which to construct the pipeline cache.", + ) + + parser_cache_pipeline.add_argument( + "-p", + "--pipeline", + type=str, + choices=Pipeline.pipelines.keys(), + help="The pipeline to run over the target dataset.", + ) + + parser_cache_pipeline.add_argument( + "--cache-dir", + type=str, + default=DEFAULT_CACHE_DIR, + help="The base directory in which pipeline output data will be stored.", + ) + + parser_cache_pipeline.add_argument( + "--batch-size", + type=int, + default=DEFAULT_BATCH_SIZE, + help="The maximum size of a single batch of data that will be passed through the pipeline.", + ) args = parser.parse_args() kwargs = vars(args) @@ -276,7 +308,9 @@ def main(): run_scenario(**kwargs) elif args.command == "report": report(**kwargs) - elif args.command == "dataload": + elif args.command == "preload-datasets": preload_datasets(**kwargs) + elif args.command == "cache-pipeline": + cache_pipeline(**kwargs) else: parser.print_help() diff --git a/experiments/datascope/experiments/pipelines/base.py b/experiments/datascope/experiments/pipelines/base.py index bf182fa..85862ba 100644 --- a/experiments/datascope/experiments/pipelines/base.py +++ b/experiments/datascope/experiments/pipelines/base.py @@ -71,6 +71,9 @@ def __hash_string__(self) -> str: def __repr__(self, N_CHAR_MAX=700): return "%s.%s()" % (type(self).__module__, type(self).__name__) + def __str__(self) -> str: + return "%s.%s" % (type(self).__module__, type(self).__name__) + @property def modalities(self) -> Iterable[DatasetModality]: return self._modalities diff --git a/experiments/datascope/experiments/scenarios/data_discard.py b/experiments/datascope/experiments/scenarios/data_discard.py index e5ad690..3415f92 100755 --- a/experiments/datascope/experiments/scenarios/data_discard.py +++ b/experiments/datascope/experiments/scenarios/data_discard.py @@ -202,6 +202,9 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None: # provenance = np.pad(provenance, pad_width=((0, 0), (0, 0), (0, 0), (0, 1))) # provenance = binarize(provenance) + if self.eager_preprocessing: + dataset.apply(pipeline, cache_dir=self.pipeline_cache_dir, inplace=True) + # Initialize the model and utility. model_type = MODEL_TYPES[self.model] model_kwargs = MODEL_KWARGS[self.model] @@ -268,7 +271,7 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None: importance = ShapleyImportance( method=method, utility=target_utility, - pipeline=shapley_pipeline, + pipeline=None if self.eager_preprocessing else shapley_pipeline, mc_iterations=mc_iterations, mc_timeout=self.mc_timeout, mc_tolerance=self.mc_tolerance, @@ -287,7 +290,7 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None: # argsorted_importances = np.ma.array(importances, mask=discarded_units).argsort() # Run the model to get initial score. - dataset_f = dataset.apply(pipeline) + dataset_f = dataset if self.eager_preprocessing else dataset.apply(pipeline, cache_dir=self.pipeline_cache_dir) eqodds: Optional[float] = None if eqodds_utility is not None: eqodds = eqodds_utility(dataset_f.X_train, dataset_f.y_train, dataset_f.X_test, dataset_f.y_test) @@ -348,7 +351,7 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None: # discarded_units[target_units] = True # Run the model. - dataset_current_f = dataset_current.apply(pipeline) + dataset_current_f = dataset_current.apply(pipeline, cache_dir=self.pipeline_cache_dir) if eqodds_utility is not None: eqodds = eqodds_utility( dataset_current_f.X_train, diff --git a/experiments/datascope/experiments/scenarios/datascope_scenario.py b/experiments/datascope/experiments/scenarios/datascope_scenario.py index d004f64..be2e357 100755 --- a/experiments/datascope/experiments/scenarios/datascope_scenario.py +++ b/experiments/datascope/experiments/scenarios/datascope_scenario.py @@ -15,6 +15,7 @@ DEFAULT_VALSIZE, DEFAULT_TESTSIZE, DEFAULT_BIAS_METHOD, + DEFAULT_CACHE_DIR, KEYWORD_REPLACEMENTS as DATASET_KEYWORD_REPLACEMENTS, ) from ..pipelines import Pipeline, ModelType, MODEL_KEYWORD_REPLACEMENTS @@ -254,6 +255,8 @@ def __init__( valsize: int = DEFAULT_VALSIZE, testsize: int = DEFAULT_TESTSIZE, augment_factor: int = DEFAULT_AUGMENT_FACTOR, + eager_preprocessing: bool = False, + pipeline_cache_dir: str = DEFAULT_CACHE_DIR, mc_timeout: int = DEFAULT_MC_TIMEOUT, mc_tolerance: float = DEFAULT_MC_TOLERANCE, nn_k: int = DEFAULT_NN_K, @@ -279,6 +282,8 @@ def __init__( self._valsize = valsize self._testsize = testsize self._augment_factor = augment_factor + self._eager_preprocessing = eager_preprocessing + self._pipeline_cache_dir = pipeline_cache_dir self._mc_timeout = mc_timeout self._mc_tolerance = mc_tolerance self._nn_k = nn_k @@ -353,6 +358,16 @@ def augment_factor(self) -> int: """The augmentation factor to apply to the dataset after loading it (if applicable).""" return self._augment_factor + @attribute(domain=[None]) + def eager_preprocessing(self) -> bool: + """Training data is passed through the preprocessing pipeline before being passed to importance computation.""" + return self._eager_preprocessing + + @attribute + def pipeline_cache_dir(self) -> str: + """The directory where the pipeline cache is stored.""" + return self._pipeline_cache_dir + @attribute def mc_timeout(self) -> int: """The maximum time in seconds that a Monte-Carlo importance method is allowed to run. Zero means no timeout.""" diff --git a/experiments/datascope/experiments/scenarios/label_repair.py b/experiments/datascope/experiments/scenarios/label_repair.py index 5825aea..b17f14f 100755 --- a/experiments/datascope/experiments/scenarios/label_repair.py +++ b/experiments/datascope/experiments/scenarios/label_repair.py @@ -235,6 +235,10 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None: # provenance = np.pad(provenance, pad_width=((0, 0), (0, 0), (0, 0), (0, 1))) # provenance = binarize(provenance) + if self.eager_preprocessing: + dataset.apply(pipeline, cache_dir=self.pipeline_cache_dir, inplace=True) + dataset_dirty.apply(pipeline, cache_dir=self.pipeline_cache_dir, inplace=True) + # Initialize the model and utility. model_type = MODEL_TYPES[self.model] model_kwargs = MODEL_KWARGS[self.model] @@ -301,7 +305,7 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None: importance = ShapleyImportance( method=method, utility=target_utility, - pipeline=shapley_pipeline, + pipeline=None if self.eager_preprocessing else shapley_pipeline, mc_iterations=mc_iterations, mc_timeout=self.mc_timeout, mc_tolerance=self.mc_tolerance, @@ -328,8 +332,12 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None: # Run the model to get initial score. # assert y_val is not None - dataset_f = dataset.apply(pipeline) - dataset_dirty_f = dataset_dirty.apply(pipeline) + dataset_f = dataset if self.eager_preprocessing else dataset.apply(pipeline, cache_dir=self.pipeline_cache_dir) + dataset_dirty_f = ( + dataset_dirty + if self.eager_preprocessing + else dataset_dirty.apply(pipeline, cache_dir=self.pipeline_cache_dir) + ) eqodds: Optional[float] = None if eqodds_utility is not None: eqodds = eqodds_utility( diff --git a/experiments/requirements.txt b/experiments/requirements.txt index acf0433..04e72e0 100644 --- a/experiments/requirements.txt +++ b/experiments/requirements.txt @@ -12,6 +12,7 @@ datasets >= 2.8 transformers >= 2.8 sentence-transformers >= 2.2 joblib >= 1.2 +tables >= 3.9 # Pipelines dependencies # torchvision >= 0.11