-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add suggestion bundle support #3041
Conversation
Code Review Agent Run #411aadActionable Suggestions - 3
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
Code Review Agent Run #c016c8Actionable Suggestions - 3
Additional Suggestions - 1
Review Details
|
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #3041 +/- ##
==========================================
- Coverage 82.79% 78.60% -4.20%
==========================================
Files 3 202 +199
Lines 186 21355 +21169
Branches 0 2744 +2744
==========================================
+ Hits 154 16786 +16632
- Misses 32 3812 +3780
- Partials 0 757 +757 ☔ View full report in Codecov by Sentry. |
Code Review Agent Run #b3b166Actionable Suggestions - 0Additional Suggestions - 1
Review Details
|
Code Review Agent Run #af2f75Actionable Suggestions - 1
Additional Suggestions - 1
Review Details
|
Code Review Agent Run #b54cd9Actionable Suggestions - 0Review Details
|
Code Review Agent Run #7205faActionable Suggestions - 0Review Details
|
Another example: import asyncio
import flytekit as fl
import sklearn
from sklearn.model_selection import train_test_split
import xgboost as xgb
import numpy as np
import optuna
import plotly
from flytekitplugins.optuna import Optimizer, suggest
image = fl.ImageSpec(
builder="union",
packages=[
"flytekit==1.15.0b0",
"optuna>=4.0.0",
"scikit-learn>=1.6.0",
"xgboost>=2.1.3",
"plotly>=5.24.1",
],
)
@fl.task(container_image=image)
async def objective(params: dict[str, int | float | str]) -> float:
(data, target) = sklearn.datasets.load_breast_cancer(return_X_y=True)
train_x, valid_x, train_y, valid_y = train_test_split(data, target, test_size=0.25)
dtrain = xgb.DMatrix(train_x, label=train_y)
dvalid = xgb.DMatrix(valid_x, label=valid_y)
bst = xgb.train(params, dtrain)
valid_yhat = bst.predict(dvalid)
accuracy = sklearn.metrics.accuracy_score(valid_y, np.rint(valid_yhat))
return accuracy
@fl.eager(container_image=image)
async def train(concurrency: int, n_trials: int):
study = optuna.create_study(direction="maximize")
optimizer = Optimizer(objective, concurrency, n_trials, study)
params = {
"lambda": suggest.float(1e-8, 1.0, log=True),
"alpha": suggest.float(1e-8, 1.0, log=True),
"subsample": suggest.float(0.2, 1.0),
"colsample_bytree": suggest.float(0.2, 1.0),
"max_depth": suggest.integer(3, 9, step=2),
"min_child_weight": suggest.integer(2, 10),
"eta": suggest.float(1e-8, 1.0, log=True),
"gamma": suggest.float(1e-8, 1.0, log=True),
"grow_policy": suggest.category(["depthwise", "lossguide"]),
"sample_type": suggest.category(["uniform", "weighted"]),
"normalize_type": suggest.category(["tree", "forest"]),
"rate_drop": suggest.float(1e-8, 1.0, log=True),
"skip_drop": suggest.float(1e-8, 1.0, log=True),
"objective": "binary:logistic",
"tree_method": "exact",
"booster": "dart",
}
await optimizer(params=params) |
In some extreme cases, it is useful to be able to define suggestions programmatically. However, this cannot happen within an objective task, because we need to tell the Study what each task is doing. Therefore, users should be able to define a This is complicated but I think it is the best solution. It takes advantage of Flyte's static typing and enables caching too. import asyncio
from typing import Any
import flytekit as fl
import sklearn
from sklearn.model_selection import train_test_split
import xgboost as xgb
import numpy as np
import optuna
import plotly
from optimizer import Optimizer
image = fl.ImageSpec(
builder="union",
packages=[
"flytekit==1.15.0b0",
"optuna>=4.0.0",
"scikit-learn>=1.6.0",
"xgboost>=2.1.3",
"plotly>=5.24.1",
],
)
@fl.task(container_image=image)
async def objective(params: dict[str, int | float | str]) -> float:
(data, target) = sklearn.datasets.load_breast_cancer(return_X_y=True)
train_x, valid_x, train_y, valid_y = train_test_split(data, target, test_size=0.25)
dtrain = xgb.DMatrix(train_x, label=train_y)
dvalid = xgb.DMatrix(valid_x, label=valid_y)
bst = xgb.train(params, dtrain)
valid_yhat = bst.predict(dvalid)
accuracy = sklearn.metrics.accuracy_score(valid_y, np.rint(valid_yhat))
print(accuracy)
return accuracy
@fl.eager(container_image=image)
async def train(concurrency: int, n_trials: int):
study = optuna.create_study(direction="maximize")
def callback(trial: optuna.Trial, inputs: dict[str, Any]) -> dict[str, Any]:
params = {
"verbosity": 0,
"objective": "binary:logistic",
# use exact for small dataset.
"tree_method": "exact",
# defines booster, gblinear for linear functions.
"booster": trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"]),
# L2 regularization weight.
"lambda": trial.suggest_float("lambda", 1e-8, 1.0, log=True),
# L1 regularization weight.
"alpha": trial.suggest_float("alpha", 1e-8, 1.0, log=True),
# sampling ratio for training data.
"subsample": trial.suggest_float("subsample", 0.2, 1.0),
# sampling according to each tree.
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
}
if params["booster"] in ["gbtree", "dart"]:
# maximum depth of the tree, signifies complexity of the tree.
params["max_depth"] = trial.suggest_int("max_depth", 3, 9, step=2)
# minimum child weight, larger the term more conservative the tree.
params["min_child_weight"] = trial.suggest_int("min_child_weight", 2, 10)
params["eta"] = trial.suggest_float("eta", 1e-8, 1.0, log=True)
# defines how selective algorithm is.
params["gamma"] = trial.suggest_float("gamma", 1e-8, 1.0, log=True)
params["grow_policy"] = trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
if params["booster"] == "dart":
params["sample_type"] = trial.suggest_categorical("sample_type", ["uniform", "weighted"])
params["normalize_type"] = trial.suggest_categorical("normalize_type", ["tree", "forest"])
params["rate_drop"] = trial.suggest_float("rate_drop", 1e-8, 1.0, log=True)
params["skip_drop"] = trial.suggest_float("skip_drop", 1e-8, 1.0, log=True)
return dict(params=params)
optimizer = Optimizer(objective, concurrency, n_trials, study, callback)
await optimizer()
show("History", optuna.visualization.plot_optimization_history(optimizer.study))
show("Timeline", optuna.visualization.plot_timeline(optimizer.study))
show("Coordinates", optuna.visualization.plot_parallel_coordinate(optimizer.study))
show("Importance", optuna.visualization.plot_param_importances(optimizer.study))
def show(name: str, fig: plotly.graph_objects.Figure):
html: str = plotly.io.to_html(
fig=fig,
config={
"scrollZoom": False,
"displayModeBar": False,
"doubleClick": "reset",
},
)
fl.Deck(name, html)
if __name__ == "__main__":
loss = asyncio.run(train(concurrency=1, n_trials=10))
print(loss) |
Code Review Agent Run #a3ee63Actionable Suggestions - 0Review Details
|
Code Review Agent Run #74ccb6Actionable Suggestions - 0Review Details
|
Code Review Agent Run #bf90adActionable Suggestions - 5
Review Details
|
if self.callback is not None: | ||
promise = self.callback(trial=trial, **inputs) | ||
else: | ||
promise = self.objective(**process(trial, inputs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the trial execution logic into a separate method to improve code organization and reusability. The conditional logic for handling callback vs direct objective execution could be cleaner in its own method.
Code suggestion
Check the AI-generated fix before applying
if self.callback is not None: | |
promise = self.callback(trial=trial, **inputs) | |
else: | |
promise = self.objective(**process(trial, inputs)) | |
promise = await self._execute_trial(trial, inputs) | |
async def _execute_trial(self, trial: optuna.Trial, inputs: dict[str, Any]) -> Awaitable[Union[float, tuple[float, ...]]]: | |
if self.callback is not None: | |
return await self.callback(trial=trial, **inputs) | |
return await self.objective(**process(trial, inputs)) |
Code Review Run #bf90ad
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
|
||
study = optuna.create_study(direction="maximize") | ||
|
||
optimizer = Optimizer(concurrency=concurrency, n_trials=n_trials, study=study, callback=callback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing the objective
function directly to the Optimizer
constructor. The current implementation relies on the callback to invoke the objective function which may not be the intended pattern.
Code suggestion
Check the AI-generated fix before applying
optimizer = Optimizer(concurrency=concurrency, n_trials=n_trials, study=study, callback=callback) | |
optimizer = Optimizer(objective=objective, concurrency=concurrency, n_trials=n_trials, study=study, callback=callback) |
Code Review Run #bf90ad
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
concurrency: int | ||
n_trials: int | ||
study: Optional[optuna.Study] = None | ||
objective: Optional[Union[PythonFunctionTask, PythonFunctionWorkflow]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removal of the required objective
field from the class definition may cause issues since it's used in the __post_init__
method. Consider keeping it as required field or updating the initialization logic.
Code suggestion
Check the AI-generated fix before applying
@@ -55,6 +55,7 @@
class Optimizer:
concurrency: int
n_trials: int
+ objective: Union[PythonFunctionTask, PythonFunctionWorkflow]
study: Optional[optuna.Study] = None
callback: Optional[Callable[Concatenate[optuna.Trial, P], Awaitable[Union[float, tuple[float, ...]]]]] = None
Code Review Run #bf90ad
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
if isinstance(self.objective, PythonFunctionTask): | ||
func = self.objective.task_function | ||
elif isinstance(self.objective, PythonFunctionWorkflow): | ||
func = self.objective._workflow_function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using public interface instead of accessing private member '_workflow_function'
Code suggestion
Check the AI-generated fix before applying
func = self.objective._workflow_function | |
func = self.objective.workflow_function |
Code Review Run #bf90ad
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Code Review Agent Run #0ff5f6Actionable Suggestions - 5
Review Details
|
out = self.objective(trial=trial, **inputs) | ||
result = out if not inspect.isawaitable(out) else await out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling potential exceptions when awaiting the objective function result. The current implementation may not properly handle errors that occur during execution of the objective function.
Code suggestion
Check the AI-generated fix before applying
out = self.objective(trial=trial, **inputs) | |
result = out if not inspect.isawaitable(out) else await out | |
try: | |
out = self.objective(trial=trial, **inputs) | |
result = out if not inspect.isawaitable(out) else await out | |
except Exception as e: | |
self.study.tell(trial, state=optuna.trial.TrialState.FAIL) | |
raise e |
Code Review Run #0ff5f6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
|
||
@dataclass | ||
class Optimizer: | ||
objective: Union[PythonFunctionTask, PythonFunctionWorkflow] | ||
objective: Union[CallbackType, PythonFunctionTask] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding type hints for the generic parameter P
in the objective
type annotation to maintain consistency with CallbackType
definition.
Code suggestion
Check the AI-generated fix before applying
objective: Union[CallbackType, PythonFunctionTask] | |
objective: Union[CallbackType[P], PythonFunctionTask] |
Code Review Run #0ff5f6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
def optimize(concurrency: int, n_trials: int, study: Optional[optuna.Study] = None): | ||
def decorator(func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding type hints for the return type of the decorator
function and the decorated function type. This would improve type safety and code clarity.
Code suggestion
Check the AI-generated fix before applying
def optimize(concurrency: int, n_trials: int, study: Optional[optuna.Study] = None): | |
def decorator(func): | |
def optimize(concurrency: int, n_trials: int, study: Optional[optuna.Study] = None) -> Callable[[CallbackType], Optimizer]: | |
def decorator(func: CallbackType) -> Optimizer: |
Code Review Run #0ff5f6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Why are the changes needed?
I recently created the
flytekitplugins.optuna
plugin.This PR adds support to bundle all of the suggestions into any number of dictionaries.
This is for the cases in which one wants to add dozens or more hyperparameters. Adding each of them as a type function input is pretty exhausting. In our case, we can collect all of these suggestions into any number of dictionaries at runtime.
What changes were proposed in this pull request?
I add support for bundled suggestions to
flytekitplugins.optuna
. Users should provide any kwargs of typedict[str, Suggestion|Any]
.How was this patch tested?
I have added an example and unit tests for the new bundling functionality.
Setup process
Screenshots
Check all the applicable boxes
Summary by Bito
Enhanced flytekit-optuna plugin with bundled hyperparameter suggestions and improved type safety. The implementation includes refactoring the Optimizer class to support both imperative and declarative styles, with instance-based suggest method and parameter organization using parent-child relationships. Added proper callback handling and streamlined multiple hyperparameter handling through runtime dictionary collection. Includes comprehensive test coverage for both synchronous and asynchronous execution paths.Unit tests added: True
Estimated effort to review (1-5, lower is better): 2