diff --git a/plugins/flytekit-optuna/README.md b/plugins/flytekit-optuna/README.md index 5e247c227f..932402561b 100644 --- a/plugins/flytekit-optuna/README.md +++ b/plugins/flytekit-optuna/README.md @@ -1,45 +1,184 @@ -# Fully Parallelized Flyte Orchestrated Optimizer +# Fully Parallelized Wrapper Around Optuna Using Flyte -WIP Flyte integration with Optuna to parallelize optimization objective function runtime. +## Overview + +This documentation provides a guide to a fully parallelized Flyte plugin for Optuna. This wrapper leverages Flyte's scalable and distributed workflow orchestration capabilities to parallelize Optuna's hyperparameter optimization across multiple trials efficiently. + +![Timeline](timeline.png) + + +## Features + +- **Ease of Use**: This plugin requires no external data storage or experiment tracking. +- **Parallelized Trial Execution**: Enables concurrent execution of Optuna trials, dramatically speeding up optimization tasks. +- **Scalability**: Leverages Flyte’s ability to scale horizontally to handle large-scale hyperparameter tuning jobs. +- **Flexible Integration**: Compatible with various machine learning frameworks and training pipelines. + +## Installation + +- Install `flytekit` +- Install `flytekitplugins.optuna` + +## Getting Started + +### Prerequisites + +- A Flyte deployment configured and running. +- Python 3.9 or later. +- Familiarity with Flyte and asynchronous programming. + +### Define the Objective Function + +The objective function defines the problem to be optimized. It should include the hyperparameters to be tuned and return a value to minimize or maximize. ```python import math import flytekit as fl -from optimizer import Optimizer, suggest - -image = fl.ImageSpec(builder="union", packages=["flytekit==1.15.0b0", "optuna>=4.0.0"]) +image = fl.ImageSpec(packages=["flytekitplugins.optuna"]) @fl.task(container_image=image) async def objective(x: float, y: int, z: int, power: int) -> float: return math.log((((x - 5) ** 2) + (y + 4) ** 4 + (3 * z - 3) ** 2)) ** power +``` + +### Configure the Flyte Workflow + +The Flyte workflow orchestrates the parallel execution of Optuna trials. Below is an example: + +```python +import flytekit as fl +from flytekitplugins.optuna import Optimizer, suggest @fl.eager(container_image=image) -async def train(concurrency: int, n_trials: int): - optimizer = Optimizer(objective, concurrency, n_trials) +async def train(concurrency: int, n_trials: int) -> float: + + optimizer = Optimizer(objective=objective, concurrency=concurrency, n_trials=n_trials) await optimizer( - x=suggest.float(low=-10, high=10), - y=suggest.integer(low=-10, high=10), - z=suggest.category([-5, 0, 3, 6, 9]), - power=2, + x = suggest.float(low=-10, high=10), + y = suggest.integer(low=-10, high=10), + z = suggest.category([-5, 0, 3, 6, 9]), + power = 2, ) + + print(optimizer.study.best_value) + +``` + +### Register and Execute the Workflow + +Submit the workflow to Flyte for execution: + +```bash +pyflyte register files . +pyflyte run --name train ``` -This integration allows one to define fully parallelized HPO experiments via `@eager` in as little as 20 lines of code. The objective task is optimized via Optuna under the hood, such that one may extract the `optuna.Study` at any time for the purposes of serialization, storage, visualization, or interpretation. +### Monitor Progress + +You can monitor the progress of the trials via the Flyte Console. Each trial runs as a separate task, and the results are aggregated by the Optuna wrapper. + +You may access the `optuna.Study` like so: `optimizer.study`. + +Therefore, with `plotly` installed, you may create create Flyte Decks of the study like so: + +```python +import plotly + +fig = optuna.visualization.plot_timeline(optimizer.study) +fl.Deck(name, plotly.io.to_html(fig)) +``` + +## Advanced Configuration + +### Custom Dictionary Inputs -This plugin provides full feature parity to Optuna, including: +Suggestions may be defined in recursive dictionaries: + +```python +import flytekit as fl +from flytekitplugins.optuna import Optimizer, suggest + +image = fl.ImageSpec(packages=["flytekitplugins.optuna"]) + + +@fl.task(container_image=image) +async def objective(params: dict[str, int | float | str]) -> float: + ... + + +@fl.eager(container_image=image) +async def train(concurrency: int, n_trials: int): + + study = optuna.create_study(direction="maximize") + + optimizer = Optimizer(objective=objective, concurrency=concurrency, n_trials=n_trials, study=study) + + params = { + "lambda": suggest.float(1e-8, 1.0, log=True), + "alpha": suggest.float(1e-8, 1.0, log=True), + "subsample": suggest.float(0.2, 1.0), + "colsample_bytree": suggest.float(0.2, 1.0), + "max_depth": suggest.integer(3, 9, step=2), + "objective": "binary:logistic", + "tree_method": "exact", + "booster": "dart", + } + + await optimizer(params=params) +``` + +### Custom Callbacks + +In some cases, you may need to create define the suggestions programmatically. This may be done + +```python +import flytekit as fl +import optuna +from flytekitplugins.optuna import optimize + +image = fl.ImageSpec(packages=["flytekitplugins.optuna"]) + +@fl.task(container_image=image) +async def objective(params: dict[str, int | float | str]) -> float: + ... + +@fl.eager(container_image=image) +async def train(concurrency: int, n_trials: int): + + study = optuna.create_study(direction="maximize") + + @optimize(n_trials=n_trials, concurrency=concurrency, study=study) + def optimizer(trial: optuna.Trial, verbosity: int, tree_method: str): + + params = { + "verbosity:": verbosity, + "tree_method": tree_method, + "objective": "binary:logistic", + # defines booster, gblinear for linear functions. + "booster": trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"]), + # sampling according to each tree. + "colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0), + } + + if params["booster"] in ["gbtree", "dart"]: + # maximum depth of the tree, signifies complexity of the tree. + params["max_depth"] = trial.suggest_int("max_depth", 3, 9, step=2) + + if params["booster"] == "dart": + params["sample_type"] = trial.suggest_categorical("sample_type", ["uniform", "weighted"]) + params["normalize_type"] = trial.suggest_categorical("normalize_type", ["tree", "forest"]) + + return objective(params) + + await optimizer(verbosity=0, tree_method="exact") +``` -- fixed arguments -- multiple suggestion types (`Integer`, `Float`, `Category`) -- multi-objective, with arbitrary objective directions (minimize, maximize) -- pruners -- samplers +## Troubleshooting -# Improvements +Resource Constraints: Ensure sufficient compute resources are allocated for the number of parallel jobs specified. -- This would synergize really well with Union Actors. -- This should also support workflows, but it currently does not. -- Add unit tests, of course. +Flyte Errors: Refer to the Flyte logs and documentation to debug workflow execution issues. diff --git a/plugins/flytekit-optuna/flytekitplugins/optuna/__init__.py b/plugins/flytekit-optuna/flytekitplugins/optuna/__init__.py index 55cd84fc8a..9804fb94a8 100644 --- a/plugins/flytekit-optuna/flytekitplugins/optuna/__init__.py +++ b/plugins/flytekit-optuna/flytekitplugins/optuna/__init__.py @@ -1,3 +1,3 @@ -from .optimizer import Optimizer, suggest +from .optimizer import Optimizer, optimize, suggest -__all__ = ["Optimizer", "suggest"] +__all__ = ["Optimizer", "optimize", "suggest"] diff --git a/plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py b/plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py index 88a4f05f62..8d909d8a31 100644 --- a/plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py +++ b/plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py @@ -1,12 +1,14 @@ import asyncio import inspect +from copy import copy, deepcopy from dataclasses import dataclass from types import SimpleNamespace -from typing import Any, Optional, Union +from typing import Any, Awaitable, Callable, Optional, Union + +from typing_extensions import Concatenate, ParamSpec import optuna from flytekit import PythonFunctionTask -from flytekit.core.workflow import PythonFunctionWorkflow from flytekit.exceptions.eager import EagerException @@ -45,13 +47,24 @@ class Category(Suggestion): suggest = SimpleNamespace(float=Float, integer=Integer, category=Category) +P = ParamSpec("P") + +Result = Union[float, tuple[float, ...]] + +CallbackType = Callable[Concatenate[optuna.Trial, P], Union[Awaitable[Result], Result]] + @dataclass class Optimizer: - objective: Union[PythonFunctionTask, PythonFunctionWorkflow] + objective: Union[CallbackType, PythonFunctionTask] concurrency: int n_trials: int study: Optional[optuna.Study] = None + delay: int = 0 + + @property + def is_imperative(self) -> bool: + return isinstance(self.objective, PythonFunctionTask) def __post_init__(self): if self.study is None: @@ -66,31 +79,36 @@ def __post_init__(self): if not isinstance(self.study, optuna.Study): raise ValueError("study must be an optuna.Study") - # check if the objective function returns the correct number of outputs - if isinstance(self.objective, PythonFunctionTask): - func = self.objective.task_function - elif isinstance(self.objective, PythonFunctionWorkflow): - func = self.objective._workflow_function - else: - raise ValueError("objective must be a PythonFunctionTask or PythonFunctionWorkflow") + if not isinstance(self.delay, int): + raise ValueError("delay must be an integer") - signature = inspect.signature(func) + if self.is_imperative: + signature = inspect.signature(self.objective.task_function) - if signature.return_annotation is float: - if len(self.study.directions) != 1: - raise ValueError("the study must have a single objective if objective returns a single float") + if signature.return_annotation is float: + if len(self.study.directions) != 1: + raise ValueError("the study must have a single objective if objective returns a single float") - elif isinstance(args := signature.return_annotation.__args__, tuple): - if len(args) != len(self.study.directions): - raise ValueError("objective must return the same number of directions in the study") + elif isinstance(args := signature.return_annotation.__args__, tuple): + if len(args) != len(self.study.directions): + raise ValueError("objective must return the same number of directions in the study") - if not all(arg is float for arg in args): + if not all(arg is float for arg in args): + raise ValueError("objective function must return a float or tuple of floats") + + else: raise ValueError("objective function must return a float or tuple of floats") else: - raise ValueError("objective function must return a float or tuple of floats") + if not callable(self.objective): + raise ValueError("objective must be a callable or a PythonFunctionTask") + + signature = inspect.signature(self.objective) + + if "trial" not in signature.parameters: + raise ValueError("objective function must have a parameter called 'trial' if not a PythonFunctionTask") - async def __call__(self, **inputs: Any): + async def __call__(self, **inputs: P.kwargs): """ Asynchronously executes the objective function remotely. Parameters: @@ -101,31 +119,28 @@ async def __call__(self, **inputs: Any): semaphore = asyncio.Semaphore(self.concurrency) # create list of async trials - trials = [self.spawn(semaphore, **inputs) for _ in range(self.n_trials)] + trials = [self.spawn(semaphore, deepcopy(inputs)) for _ in range(self.n_trials)] # await all trials to complete await asyncio.gather(*trials) - async def spawn(self, semaphore: asyncio.Semaphore, **inputs: Any): + async def spawn(self, semaphore: asyncio.Semaphore, inputs: dict[str, Any]): async with semaphore: + await asyncio.sleep(self.delay) + # ask for a new trial trial: optuna.Trial = self.study.ask() - suggesters = { - Float: trial.suggest_float, - Integer: trial.suggest_int, - Category: trial.suggest_categorical, - } - - # suggest inputs for the trial - for key, value in inputs.items(): - if isinstance(value, Suggestion): - suggester = suggesters[type(value)] - inputs[key] = suggester(name=key, **vars(value)) - try: + result: Union[float, tuple[float, ...]] + # schedule the trial - result: Union[float, tuple[float, ...]] = await self.objective(**inputs) + if self.is_imperative: + result = await self.objective(**process(trial, inputs)) + + else: + out = self.objective(trial=trial, **inputs) + result = out if not inspect.isawaitable(out) else await out # tell the study the result self.study.tell(trial, result, state=optuna.trial.TrialState.COMPLETE) @@ -133,3 +148,33 @@ async def spawn(self, semaphore: asyncio.Semaphore, **inputs: Any): # if the trial fails, tell the study except EagerException: self.study.tell(trial, state=optuna.trial.TrialState.FAIL) + + +def optimize(concurrency: int, n_trials: int, study: Optional[optuna.Study] = None): + def decorator(func): + return Optimizer(func, concurrency=concurrency, n_trials=n_trials, study=study) + + return decorator + + +def process(trial: optuna.Trial, inputs: dict[str, Any], root: Optional[list[str]] = None) -> dict[str, Any]: + if root is None: + root = [] + + suggesters = { + Float: trial.suggest_float, + Integer: trial.suggest_int, + Category: trial.suggest_categorical, + } + + for key, value in inputs.items(): + path = copy(root) + [key] + + if isinstance(inputs[key], Suggestion): + suggester = suggesters[type(value)] + inputs[key] = suggester(name=(".").join(path), **vars(value)) + + elif isinstance(value, dict): + inputs[key] = process(trial=trial, inputs=value, root=path) + + return inputs diff --git a/plugins/flytekit-optuna/setup.py b/plugins/flytekit-optuna/setup.py index 82e3e2f880..3bc7285d1f 100644 --- a/plugins/flytekit-optuna/setup.py +++ b/plugins/flytekit-optuna/setup.py @@ -4,7 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.15.0", "optuna>=4.0.0,<5.0.0"] +plugin_requires = ["flytekit>=1.15.0", "optuna>=4.0.0,<5.0.0", "typing-extensions>=4.10,<5.0"] __version__ = "0.0.0+develop" diff --git a/plugins/flytekit-optuna/tests/__init__.py b/plugins/flytekit-optuna/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/flytekit-optuna/tests/test_callback.py b/plugins/flytekit-optuna/tests/test_callback.py new file mode 100644 index 0000000000..a7a6eaf768 --- /dev/null +++ b/plugins/flytekit-optuna/tests/test_callback.py @@ -0,0 +1,91 @@ +from typing import Union + +import asyncio +from typing import Union +import optuna + +import flytekit as fl +from flytekitplugins.optuna import Optimizer + + +def test_callback(): + + + @fl.task + async def objective(letter: str, number: Union[float, int], other: str, fixed: str) -> float: + + loss = len(letter) + number + len(other) + len(fixed) + + return float(loss) + + def callback(trial: optuna.Trial, fixed: str): + + letter = trial.suggest_categorical("booster", ["A", "B", "BLAH"]) + + if letter == "A": + number = trial.suggest_int("number_A", 1, 10) + elif letter == "B": + number = trial.suggest_float("number_B", 10., 20.) + else: + number = 10 + + other = trial.suggest_categorical("other", ["Something", "another word", "a phrase"]) + + return objective(letter, number, other, fixed) + + + @fl.eager + async def train(concurrency: int, n_trials: int) -> float: + + study = optuna.create_study(direction="maximize") + + optimizer = Optimizer(callback, concurrency=concurrency, n_trials=n_trials, study=study) + + await optimizer(fixed="hello!") + + return float(optimizer.study.best_value) + + loss = asyncio.run(train(concurrency=2, n_trials=10)) + + assert isinstance(loss, float) + + +def test_async_callback(): + + @fl.task + async def objective(letter: str, number: Union[float, int], other: str, fixed: str) -> float: + + loss = len(letter) + number + len(other) + len(fixed) + + return float(loss) + + async def callback(trial: optuna.Trial, fixed: str): + + letter = trial.suggest_categorical("booster", ["A", "B", "BLAH"]) + + if letter == "A": + number = trial.suggest_int("number_A", 1, 10) + elif letter == "B": + number = trial.suggest_float("number_B", 10., 20.) + else: + number = 10 + + other = trial.suggest_categorical("other", ["Something", "another word", "a phrase"]) + + return await objective(letter, number, other, fixed) + + + @fl.eager + async def train(concurrency: int, n_trials: int) -> float: + + study = optuna.create_study(direction="maximize") + + optimizer = Optimizer(callback, concurrency=concurrency, n_trials=n_trials, study=study) + + await optimizer(fixed="hello!") + + return float(optimizer.study.best_value) + + loss = asyncio.run(train(concurrency=2, n_trials=10)) + + assert isinstance(loss, float) diff --git a/plugins/flytekit-optuna/tests/test_decorator.py b/plugins/flytekit-optuna/tests/test_decorator.py new file mode 100644 index 0000000000..89fe0d6480 --- /dev/null +++ b/plugins/flytekit-optuna/tests/test_decorator.py @@ -0,0 +1,72 @@ +from typing import Union +import math + +import asyncio +from typing import Union +import optuna + +import flytekit as fl +from flytekitplugins.optuna import optimize, suggest + + +def test_local_exec(): + + @fl.eager + async def train(concurrency: int, n_trials: int) -> float: + + @optimize(concurrency=concurrency, n_trials=n_trials) + @fl.task + async def optimizer(x: float, y: int, z: int, power: int) -> float: + return math.log((((x - 5) ** 2) + (y + 4) ** 4 + (3 * z - 3) ** 2)) ** power + + await optimizer( + x=suggest.float(low=-10, high=10), + y=suggest.integer(low=-10, high=10), + z=suggest.category([-5, 0, 3, 6, 9]), + power=2, + ) + + return optimizer.study.best_value + + loss = asyncio.run(train(concurrency=2, n_trials=10)) + + assert isinstance(loss, float) + + +def test_callback(): + + + @fl.task + async def objective(letter: str, number: Union[float, int], other: str, fixed: str) -> float: + + loss = len(letter) + number + len(other) + len(fixed) + + return float(loss) + + @optimize(n_trials=10, concurrency=2) + def optimizer(trial: optuna.Trial, fixed: str): + + letter = trial.suggest_categorical("booster", ["A", "B", "BLAH"]) + + if letter == "A": + number = trial.suggest_int("number_A", 1, 10) + elif letter == "B": + number = trial.suggest_float("number_B", 10., 20.) + else: + number = 10 + + other = trial.suggest_categorical("other", ["Something", "another word", "a phrase"]) + + return objective(letter, number, other, fixed) + + + @fl.eager + async def train(concurrency: int, n_trials: int) -> float: + + await optimizer(fixed="hello!") + + return float(optimizer.study.best_value) + + loss = asyncio.run(train(concurrency=2, n_trials=10)) + + assert isinstance(loss, float) diff --git a/plugins/flytekit-optuna/tests/test_imperative.py b/plugins/flytekit-optuna/tests/test_imperative.py new file mode 100644 index 0000000000..10d9213878 --- /dev/null +++ b/plugins/flytekit-optuna/tests/test_imperative.py @@ -0,0 +1,69 @@ +from typing import Union + +import asyncio +import math +from typing import Union + +import flytekit as fl +from flytekitplugins.optuna import Optimizer, suggest + + + +def test_local_exec(): + + + @fl.task + async def objective(x: float, y: int, z: int, power: int) -> float: + return math.log((((x - 5) ** 2) + (y + 4) ** 4 + (3 * z - 3) ** 2)) ** power + + + @fl.eager + async def train(concurrency: int, n_trials: int) -> float: + optimizer = Optimizer(objective, concurrency=concurrency, n_trials=n_trials) + + await optimizer( + x=suggest.float(low=-10, high=10), + y=suggest.integer(low=-10, high=10), + z=suggest.category([-5, 0, 3, 6, 9]), + power=2, + ) + + return optimizer.study.best_value + + loss = asyncio.run(train(concurrency=2, n_trials=10)) + + assert isinstance(loss, float) + + + +def test_bundled_local_exec(): + + @fl.task + async def objective(suggestions: dict[str, Union[int, float]], z: int, power: int) -> float: + + # building out a large set of typed inputs is exhausting, so we can just use a dict + + x, y = suggestions["x"], suggestions["y"] + + return math.log((((x - 5) ** 2) + (y + 4) ** 4 + (3 * z - 3) ** 2)) ** power + + + @fl.eager + async def train(concurrency: int, n_trials: int) -> float: + optimizer = Optimizer(objective, concurrency=concurrency, n_trials=n_trials) + + suggestions = { + "x": suggest.float(low=-10, high=10), + "y": suggest.integer(low=-10, high=10), + } + + await optimizer( + suggestions=suggestions, + z=suggest.category([-5, 0, 3, 6, 9]), + power=2, + ) + + return optimizer.study.best_value + loss = asyncio.run(train(concurrency=2, n_trials=10)) + + assert isinstance(loss, float) diff --git a/plugins/flytekit-optuna/tests/test_optimizer.py b/plugins/flytekit-optuna/tests/test_optimizer.py deleted file mode 100644 index 33150fd03d..0000000000 --- a/plugins/flytekit-optuna/tests/test_optimizer.py +++ /dev/null @@ -1,32 +0,0 @@ -import asyncio -import math - -import flytekit as fl -from flytekitplugins.optuna import Optimizer, suggest - - -image = fl.ImageSpec(builder="union", packages=["flytekit==1.15.0b0", "optuna>=4.0.0"]) - -@fl.task(container_image=image) -async def objective(x: float, y: int, z: int, power: int) -> float: - return math.log((((x - 5) ** 2) + (y + 4) ** 4 + (3 * z - 3) ** 2)) ** power - - -@fl.eager(container_image=image) -async def train(concurrency: int, n_trials: int) -> float: - optimizer = Optimizer(objective, concurrency, n_trials) - - await optimizer( - x=suggest.float(low=-10, high=10), - y=suggest.integer(low=-10, high=10), - z=suggest.category([-5, 0, 3, 6, 9]), - power=2, - ) - - return optimizer.study.best_value - -def test_local_exec(): - - loss = asyncio.run(train(concurrency=2, n_trials=10)) - - assert isinstance(loss, float) diff --git a/plugins/flytekit-optuna/timeline.png b/plugins/flytekit-optuna/timeline.png new file mode 100644 index 0000000000..473ac0a4f9 Binary files /dev/null and b/plugins/flytekit-optuna/timeline.png differ