Skip to content

Commit

Permalink
Add meta inference for ParallelPostFit.predict for sparse arrays (#889
Browse files Browse the repository at this point in the history
)

* Added work-around for sparse matrices

Co-authored-by: Genevieve Buckley <[email protected]>
  • Loading branch information
VibhuJawa and GenevieveBuckley authored Nov 30, 2021
1 parent cf24100 commit 94e5261
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 6 deletions.
34 changes: 29 additions & 5 deletions dask_ml/wrappers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Meta-estimators for parallelizing estimators using the scikit-learn API."""
import logging
import warnings

import dask.array as da
import dask.dataframe as dd
Expand Down Expand Up @@ -662,9 +663,32 @@ def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar):
"""
# sklearn fails if input array has size size
# It requires at least 1 sample to run successfully
ar = np.zeros(
shape=(1, input_dask_ar.shape[1]),
dtype=input_dask_ar.dtype,
like=input_dask_ar._meta,
)
input_meta = input_dask_ar._meta
if hasattr(input_meta, "__array_function__"):
ar = np.zeros(
shape=(1, input_dask_ar.shape[1]),
dtype=input_dask_ar.dtype,
like=input_meta,
)
elif "scipy.sparse" in type(input_meta).__module__:
# sparse matrices dont support
# `like` due to non implimented __array_function__
# Refer https://github.com/scipy/scipy/issues/10362
# Note below works for both cupy and scipy sparse matrices
ar = type(input_meta)((1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype)
else:
func_name = model_fn.__name__.strip("_")
msg = (
f"Metadata for {func_name} is not provided, so Dask is "
f"running the {func_name} "
"function on a small dataset to guess output metadata. "
"As a result, It is possible that Dask will guess incorrectly.\n"
"To silence this warning, provide explicit "
f"`{func_name}_meta` to the dask_ml.wrapper."
"\nExample: \n"
"wrap_clf = dask_ml.wrappers.Incremental(GradientBoostingClassifier(), "
f"{func_name}_meta = np.array([1],dtype=np.int8))"
)
warnings.warn(msg)
ar = np.zeros(shape=(1, input_dask_ar.shape[1]), dtype=input_dask_ar.dtype)
return model_fn(ar, estimator)
25 changes: 25 additions & 0 deletions tests/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sklearn.datasets
import sklearn.model_selection
from dask.array.utils import assert_eq
from scipy.sparse import csr_matrix
from sklearn.base import clone
from sklearn.linear_model import SGDClassifier, SGDRegressor
from sklearn.pipeline import make_pipeline
Expand Down Expand Up @@ -210,3 +211,27 @@ def test_incremental_text_pipeline(container):

X2.compute_chunk_sizes()
assert X2.shape == (300, vect.n_features)

preds = pipe.predict(X).compute()
assert len(y) == len(preds)


def test_incremental_sparse_inputs():
X = csr_matrix((3, 4))
y = np.asarray([0, 1, 1], dtype=np.int32)

X_da = da.from_array(X, chunks=(1, 4))
y_da = da.from_array(y, chunks=(1))

clf = SGDClassifier(tol=1e-3)
wrap_clf = dask_ml.wrappers.Incremental(
SGDClassifier(tol=1e-3), scoring="accuracy", assume_equal_chunks=True,
)

wrap_clf = wrap_clf.fit(X_da, y_da, classes=[0, 1])
wrap_output = wrap_clf.predict(X_da).compute()

clf = clf.fit(X, y)
clf_output = clf.predict(X).astype(np.int64)

assert_eq(clf_output, wrap_output, ignore_dtype=True)
60 changes: 59 additions & 1 deletion tests/test_parallel_post_fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import pandas as pd
import pytest
import sklearn.datasets
from scipy.sparse import csr_matrix
from sklearn.decomposition import PCA
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.linear_model import LinearRegression, LogisticRegression, SGDClassifier
from sklearn.naive_bayes import CategoricalNB
from sklearn.preprocessing import OneHotEncoder

Expand Down Expand Up @@ -231,3 +232,60 @@ def test_auto_rechunk():
X = X.rechunk({0: 100, 1: 10})
X._chunks = (tuple(np.nan for _ in X.chunks[0]), X.chunks[1])
clf.predict(X)


def test_sparse_inputs():
X = csr_matrix((3, 4))
y = np.asarray([0, 0, 1], dtype=np.int32)

base = SGDClassifier(tol=1e-3)
base = base.fit(X, y)

wrap = ParallelPostFit(base)
X_da = da.from_array(X, chunks=(1, 4))

result = wrap.predict(X_da).compute()
expected = base.predict(X)

assert_eq_ar(result, expected)


def test_warning_on_dask_array_without_array_function():
X, y = make_classification(n_samples=10, n_features=2, chunks=10)
clf = ParallelPostFit(GradientBoostingClassifier())
clf = clf.fit(X, y)

class FakeArray:
def __init__(self, value):
self.value = value

@property
def ndim(self):
return self.value.ndim

@property
def len(self):
return self.value.len

@property
def dtype(self):
return self.value.dtype

@property
def shape(self):
return self.value.shape

ar = FakeArray(np.zeros(shape=(2, 2)))
fake_dask_ar = da.from_array(ar)
fake_dask_ar._meta = FakeArray(np.zeros(shape=(0, 0)))

with pytest.warns(
UserWarning, match="provide explicit `predict_meta` to the dask_ml.wrapper"
):
clf.predict(fake_dask_ar)

with pytest.warns(
UserWarning,
match="provide explicit `predict_proba_meta` to the dask_ml.wrapper",
):
clf.predict_proba(fake_dask_ar)

0 comments on commit 94e5261

Please sign in to comment.