Skip to content

Commit

Permalink
Avoid persisting Futures
Browse files Browse the repository at this point in the history
Closes dask#1003
  • Loading branch information
TomAugspurger committed Nov 26, 2024
1 parent ed8a2b7 commit b93d6dc
Showing 1 changed file with 31 additions and 2 deletions.
33 changes: 31 additions & 2 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,19 @@ def get_futures(partial_fit_calls):
_specs[ident] = spec

if DISTRIBUTED_2021_02_0:
_models, _scores, _specs = dask.persist(_models, _scores, _specs)
# https://github.com/dask/dask-ml/issues/1003
# We only want to persist dask collections, not Futures.
# So we build a collection without futures and bring them back later.
to_persist = {
"models": {k: v for k, v in _models.items() if not isinstance(v, Future)},
"scores": {k: v for k, v in _scores.items() if not isinstance(v, Future)},
"specs": {k: v for k, v in _specs.items() if not isinstance(v, Future)},
}
models_p, scores_p, specs_p = dask.persist(*list(to_persist.values()))
# Update with keys not present, which should just be futures
_models = {**_models, **models_p}
_scores = {**_scores, **scores_p}
_specs = {**_specs, **specs_p}
else:
_models, _scores, _specs = dask.persist(
_models, _scores, _specs, priority={tuple(_specs.values()): -1}
Expand Down Expand Up @@ -315,7 +327,24 @@ def get_futures(partial_fit_calls):
_specs[ident] = spec

if DISTRIBUTED_2021_02_0:
_models2, _scores2, _specs2 = dask.persist(_models, _scores, _specs)
# https://github.com/dask/dask-ml/issues/1003
# We only want to persist dask collections, not Futures.
# So we build a collection without futures and bring them back later.
to_persist = {
"models": {
k: v for k, v in _models.items() if not isinstance(v, Future)
},
"scores": {
k: v for k, v in _scores.items() if not isinstance(v, Future)
},
"specs": {k: v for k, v in _specs.items() if not isinstance(v, Future)},
}
models2_p, scores2_p, specs2_p = dask.persist(*list(to_persist.values()))
# Update with keys not present, which should just be futures
_models2 = {**_models, **models2_p}
_scores2 = {**_scores, **scores2_p}
_specs2 = {**_specs, **specs2_p}

else:
_models2, _scores2, _specs2 = dask.persist(
_models, _scores, _specs, priority={tuple(_specs.values()): -1}
Expand Down

0 comments on commit b93d6dc

Please sign in to comment.