Skip to content

Commit

Permalink
Introduce caching of pipeline outputs over entire datasets.
Browse files Browse the repository at this point in the history
  • Loading branch information
bojan-karlas committed Dec 12, 2023
1 parent 078ae26 commit 7c25fbf
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 59 deletions.
12 changes: 12 additions & 0 deletions experiments/datascope/experiments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from tqdm import tqdm
from typing import Any, Optional, Sequence, Tuple

from .datasets import DEFAULT_BATCH_SIZE, DEFAULT_CACHE_DIR, Dataset
from .pipelines import Pipeline

from .scenarios import (
Study,
Scenario,
Expand Down Expand Up @@ -218,3 +221,12 @@ def report(
with Pool(processes=None, initializer=_init_pool, initargs=(lock,)) as pool:
for result in tqdm(pool.imap_unordered(_report_generator, item_args), desc="Reports", total=len(reports)):
tqdm.write(result)


def cache_pipeline(
dataset: str, pipeline: str, cache_dir: str = DEFAULT_CACHE_DIR, batch_size: int = DEFAULT_BATCH_SIZE, **kwargs
) -> None:
cls_dataset = Dataset.datasets[dataset]
inst_dataset = cls_dataset()
inst_pipeline = Pipeline.pipelines[pipeline].construct(dataset=inst_dataset)
cls_dataset.construct_apply_cache(pipeline=inst_pipeline, cache_dir=cache_dir, batch_size=batch_size)
10 changes: 10 additions & 0 deletions experiments/datascope/experiments/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
DEFAULT_BIAS_METHOD,
KEYWORD_REPLACEMENTS,
DEFAULT_AUGMENT_FACTOR,
DEFAULT_CACHE_DIR,
DEFAULT_BATCH_SIZE,
preload_datasets,
batched_pipeline_transform,
save_cached_features,
load_cached_features,
)

__all__ = [
Expand Down Expand Up @@ -54,5 +59,10 @@
"DEFAULT_BIAS_METHOD",
"KEYWORD_REPLACEMENTS",
"DEFAULT_AUGMENT_FACTOR",
"DEFAULT_CACHE_DIR",
"DEFAULT_BATCH_SIZE",
"preload_datasets",
"batched_pipeline_transform",
"save_cached_features",
"load_cached_features",
]
318 changes: 269 additions & 49 deletions experiments/datascope/experiments/datasets/base.py

Large diffs are not rendered by default.

42 changes: 38 additions & 4 deletions experiments/datascope/experiments/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

from typing import Any, Optional

from .base import run, run_scenario, report
from .datasets import preload_datasets
from .base import run, run_scenario, report, cache_pipeline
from .datasets import preload_datasets, DEFAULT_BATCH_SIZE, DEFAULT_CACHE_DIR, Dataset
from .pipelines import Pipeline
from .scenarios import (
Scenario,
Report,
Expand Down Expand Up @@ -265,8 +266,39 @@ def main():
single_instance=False,
)

subparsers.add_parser("dataload")
subparsers.add_parser("preload-datasets")

parser_cache_pipeline = subparsers.add_parser("cache-pipeline")

parser_cache_pipeline.add_argument(
"-d",
"--dataset",
type=str,
choices=Dataset.datasets.keys(),
help="The target dataset for which to construct the pipeline cache.",
)

parser_cache_pipeline.add_argument(
"-p",
"--pipeline",
type=str,
choices=Pipeline.pipelines.keys(),
help="The pipeline to run over the target dataset.",
)

parser_cache_pipeline.add_argument(
"--cache-dir",
type=str,
default=DEFAULT_CACHE_DIR,
help="The base directory in which pipeline output data will be stored.",
)

parser_cache_pipeline.add_argument(
"--batch-size",
type=int,
default=DEFAULT_BATCH_SIZE,
help="The maximum size of a single batch of data that will be passed through the pipeline.",
)
args = parser.parse_args()
kwargs = vars(args)

Expand All @@ -276,7 +308,9 @@ def main():
run_scenario(**kwargs)
elif args.command == "report":
report(**kwargs)
elif args.command == "dataload":
elif args.command == "preload-datasets":
preload_datasets(**kwargs)
elif args.command == "cache-pipeline":
cache_pipeline(**kwargs)
else:
parser.print_help()
3 changes: 3 additions & 0 deletions experiments/datascope/experiments/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def __hash_string__(self) -> str:
def __repr__(self, N_CHAR_MAX=700):
return "%s.%s()" % (type(self).__module__, type(self).__name__)

def __str__(self) -> str:
return "%s.%s" % (type(self).__module__, type(self).__name__)

@property
def modalities(self) -> Iterable[DatasetModality]:
return self._modalities
Expand Down
9 changes: 6 additions & 3 deletions experiments/datascope/experiments/scenarios/data_discard.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None:
# provenance = np.pad(provenance, pad_width=((0, 0), (0, 0), (0, 0), (0, 1)))
# provenance = binarize(provenance)

if self.eager_preprocessing:
dataset.apply(pipeline, cache_dir=self.pipeline_cache_dir, inplace=True)

# Initialize the model and utility.
model_type = MODEL_TYPES[self.model]
model_kwargs = MODEL_KWARGS[self.model]
Expand Down Expand Up @@ -268,7 +271,7 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None:
importance = ShapleyImportance(
method=method,
utility=target_utility,
pipeline=shapley_pipeline,
pipeline=None if self.eager_preprocessing else shapley_pipeline,
mc_iterations=mc_iterations,
mc_timeout=self.mc_timeout,
mc_tolerance=self.mc_tolerance,
Expand All @@ -287,7 +290,7 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None:
# argsorted_importances = np.ma.array(importances, mask=discarded_units).argsort()

# Run the model to get initial score.
dataset_f = dataset.apply(pipeline)
dataset_f = dataset if self.eager_preprocessing else dataset.apply(pipeline, cache_dir=self.pipeline_cache_dir)
eqodds: Optional[float] = None
if eqodds_utility is not None:
eqodds = eqodds_utility(dataset_f.X_train, dataset_f.y_train, dataset_f.X_test, dataset_f.y_test)
Expand Down Expand Up @@ -348,7 +351,7 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None:
# discarded_units[target_units] = True

# Run the model.
dataset_current_f = dataset_current.apply(pipeline)
dataset_current_f = dataset_current.apply(pipeline, cache_dir=self.pipeline_cache_dir)
if eqodds_utility is not None:
eqodds = eqodds_utility(
dataset_current_f.X_train,
Expand Down
15 changes: 15 additions & 0 deletions experiments/datascope/experiments/scenarios/datascope_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DEFAULT_VALSIZE,
DEFAULT_TESTSIZE,
DEFAULT_BIAS_METHOD,
DEFAULT_CACHE_DIR,
KEYWORD_REPLACEMENTS as DATASET_KEYWORD_REPLACEMENTS,
)
from ..pipelines import Pipeline, ModelType, MODEL_KEYWORD_REPLACEMENTS
Expand Down Expand Up @@ -254,6 +255,8 @@ def __init__(
valsize: int = DEFAULT_VALSIZE,
testsize: int = DEFAULT_TESTSIZE,
augment_factor: int = DEFAULT_AUGMENT_FACTOR,
eager_preprocessing: bool = False,
pipeline_cache_dir: str = DEFAULT_CACHE_DIR,
mc_timeout: int = DEFAULT_MC_TIMEOUT,
mc_tolerance: float = DEFAULT_MC_TOLERANCE,
nn_k: int = DEFAULT_NN_K,
Expand All @@ -279,6 +282,8 @@ def __init__(
self._valsize = valsize
self._testsize = testsize
self._augment_factor = augment_factor
self._eager_preprocessing = eager_preprocessing
self._pipeline_cache_dir = pipeline_cache_dir
self._mc_timeout = mc_timeout
self._mc_tolerance = mc_tolerance
self._nn_k = nn_k
Expand Down Expand Up @@ -353,6 +358,16 @@ def augment_factor(self) -> int:
"""The augmentation factor to apply to the dataset after loading it (if applicable)."""
return self._augment_factor

@attribute(domain=[None])
def eager_preprocessing(self) -> bool:
"""Training data is passed through the preprocessing pipeline before being passed to importance computation."""
return self._eager_preprocessing

@attribute
def pipeline_cache_dir(self) -> str:
"""The directory where the pipeline cache is stored."""
return self._pipeline_cache_dir

@attribute
def mc_timeout(self) -> int:
"""The maximum time in seconds that a Monte-Carlo importance method is allowed to run. Zero means no timeout."""
Expand Down
14 changes: 11 additions & 3 deletions experiments/datascope/experiments/scenarios/label_repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None:
# provenance = np.pad(provenance, pad_width=((0, 0), (0, 0), (0, 0), (0, 1)))
# provenance = binarize(provenance)

if self.eager_preprocessing:
dataset.apply(pipeline, cache_dir=self.pipeline_cache_dir, inplace=True)
dataset_dirty.apply(pipeline, cache_dir=self.pipeline_cache_dir, inplace=True)

# Initialize the model and utility.
model_type = MODEL_TYPES[self.model]
model_kwargs = MODEL_KWARGS[self.model]
Expand Down Expand Up @@ -301,7 +305,7 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None:
importance = ShapleyImportance(
method=method,
utility=target_utility,
pipeline=shapley_pipeline,
pipeline=None if self.eager_preprocessing else shapley_pipeline,
mc_iterations=mc_iterations,
mc_timeout=self.mc_timeout,
mc_tolerance=self.mc_tolerance,
Expand All @@ -328,8 +332,12 @@ def _run(self, progress_bar: bool = True, **kwargs: Any) -> None:

# Run the model to get initial score.
# assert y_val is not None
dataset_f = dataset.apply(pipeline)
dataset_dirty_f = dataset_dirty.apply(pipeline)
dataset_f = dataset if self.eager_preprocessing else dataset.apply(pipeline, cache_dir=self.pipeline_cache_dir)
dataset_dirty_f = (
dataset_dirty
if self.eager_preprocessing
else dataset_dirty.apply(pipeline, cache_dir=self.pipeline_cache_dir)
)
eqodds: Optional[float] = None
if eqodds_utility is not None:
eqodds = eqodds_utility(
Expand Down
1 change: 1 addition & 0 deletions experiments/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ datasets >= 2.8
transformers >= 2.8
sentence-transformers >= 2.2
joblib >= 1.2
tables >= 3.9

# Pipelines dependencies
# torchvision >= 0.11
Expand Down

0 comments on commit 7c25fbf

Please sign in to comment.