Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-15936 add data frame transformation using polars #15942

Merged
merged 12 commits into from
Dec 6, 2023
64 changes: 34 additions & 30 deletions h2o-py/h2o/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from h2o.utils.config import get_config_value
from h2o.utils.metaclass import deprecated_fn
from h2o.utils.shared_utils import(gen_header, is_list, is_list_of_lists, is_str_list, py_tmp_key, quoted,
can_use_pandas, can_use_numpy, quote, normalize_slice, slice_is_normalized,
check_frame_id, can_use_datatable)
can_use_pandas, can_use_numpy, quote, normalize_slice, slice_is_normalized,
check_frame_id, can_use_datatable, can_use_polars, can_use_pyarrow)
from h2o.utils.threading import local_context, local_env
from h2o.utils.typechecks import (assert_is_type, assert_satisfies, Enum, I, is_type, numeric, numpy_ndarray,
numpy_datetime, pandas_dataframe, pandas_timestamp, scipy_sparse, U)
Expand Down Expand Up @@ -1942,15 +1942,15 @@ def structure(self):
else:
print("num {}".format(" ".join(it[0] if it else "nan" for it in h2o.as_list(self[:10, i], False)[1:])))

def as_data_frame(self, use_pandas=True, header=True, multi_thread=False):
def as_data_frame(self, use_pandas=True, header=True):
"""
Obtain the dataset as a python-local object.

:param bool use_pandas: If True (default) then return the H2OFrame as a pandas DataFrame (requires that the
``pandas`` library was installed). If False, then return the contents of the H2OFrame as plain nested
list, in a row-wise order.
:param bool multi_thread: if True and if use_pandas is True, will use datatable to speedup the conversion from
H2O frame to pandas frame that uses multi-thread.
list, in a row-wise order. The conversion to pandas frame will use multi-thread whenever
possible with the right python modules (datatable or polars and pyarrow) installed. Otherwise, single
thread operation will be used in the conversion.
:param bool header: If True (default), then column names will be appended as the first row in list

:returns: A python object (a list of lists of strings, each list is a row, if ``use_pandas=False``, otherwise
Expand All @@ -1969,35 +1969,39 @@ def as_data_frame(self, use_pandas=True, header=True, multi_thread=False):
"""
if can_use_pandas() and use_pandas:
import pandas
if multi_thread:
if can_use_datatable():
try:
tmpdir = tempfile.mkdtemp()
fileName = os.path.join(tmpdir, "h2oframe2Convert.csv")
h2o.export_file(self, fileName)
#h2o.download_csv(self, fileName)
import datatable as dt
frameTypes = self.types
validFrameTypes = {}
for key, value in frameTypes.items():
if value.startswith('int'):
validFrameTypes[key] = dt.int64
elif value.startswith("real"):
validFrameTypes[key] = dt.float64
dt_frame = dt.fread(fileName, na_strings=[""], columns=validFrameTypes)
return dt_frame.to_pandas()
finally:
os.remove(fileName)
os.rmdir(tmpdir)
elif not(can_use_datatable()):
warnings.warn("multi_thread mode can only be used when you have datatable "
"installed. Defaults to single-thread operation.")
return pandas.read_csv(StringIO(self.get_frame_data()), low_memory=False, skip_blank_lines=False)
if (can_use_datatable()) or (can_use_polars() and can_use_pyarrow()): # can use multi-thread
with tempfile.NamedTemporaryFile(suffix=".h2oframe2Convert.csv") as exportFile:
h2o.export_file(self, exportFile.name, force=True)
if can_use_datatable(): # use datatable for multi-thread by default
return self.convert_with_datatable(exportFile.name)
elif can_use_polars() and can_use_pyarrow(): # polar/pyarrow if datatable is not available
return self.convert_with_polars(exportFile.name)
warnings.warn("converting H2O frame to pandas dataframe using single-thread. For faster conversion using"
" multi-thread, install datatable, or polars and pyarrow.")
return pandas.read_csv(StringIO(self.get_frame_data()), low_memory=False, skip_blank_lines=False)

from h2o.utils.csv.readers import reader
frame = [row for row in reader(StringIO(self.get_frame_data()))]
if not header:
frame.pop(0)
return frame

def convert_with_polars(selfself, fileName):
import polars as pl
dt_frame = pl.read_csv(fileName, null_values = "")
return dt_frame.to_pandas()

def convert_with_datatable(self, fileName):
import datatable as dt
frameTypes = self.types
validFrameTypes = {}
for key, value in frameTypes.items():
if value.startswith('int'):
validFrameTypes[key] = dt.int64
elif value.startswith("real"):
validFrameTypes[key] = dt.float64
dt_frame = dt.fread(fileName, na_strings=[""], columns=validFrameTypes)
return dt_frame.to_pandas()

def save_to_hive(self, jdbc_url, table_name, format="csv", table_path=None, tmp_path=None):
"""
Expand Down
18 changes: 9 additions & 9 deletions h2o-py/h2o/utils/shared_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __subclasshook__(cls, C):
from h2o.backend.server import H2OLocalServer
from h2o.exceptions import H2OValueError
from h2o.utils.typechecks import assert_is_type, is_type, numeric
from h2o.utils.threading import local_env

_id_ctr = 0

Expand Down Expand Up @@ -120,26 +121,29 @@ def temp_ctr():


def is_module_available(mod):
if local_env(mod+"_disabled"): # fast track if module is explicitly disabled
return False
if mod in sys.modules and sys.modules[mod] is not None: # fast track + safer in unusual environments
return True

import importlib.util
return importlib.util.find_spec(mod) is not None


def can_use_pandas():
return is_module_available('pandas')

def can_install_datatable():
return sys.version_info.major == 3 and sys.version_info.minor <= 9

def can_use_datatable():
return is_module_available('datatable')

def can_use_polars():
return is_module_available('polars')

def can_use_pyarrow():
return is_module_available('pyarrow')

def can_use_numpy():
return is_module_available('numpy')


_url_safe_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~"
_url_chars_map = [chr(i) if chr(i) in _url_safe_chars else "%%%02X" % i for i in range(256)]

Expand All @@ -148,20 +152,16 @@ def url_encode(s):
# Note: type cast str(s) will not be needed once all code is made compatible
return "".join(_url_chars_map[c] for c in bytes_iterator(s))


def quote(s):
return url_encode(s)


def clamp(x, xmin, xmax):
"""Return the value of x, clamped from below by `xmin` and from above by `xmax`."""
return max(xmin, min(x, xmax))


def _gen_header(cols):
return ["C" + str(c) for c in range(1, cols + 1, 1)]


def stringify_dict(d):
return stringify_list(["{'key': %s, 'value': %s}" % (_quoted(k), v) for k, v in d.items()])

Expand Down
15 changes: 14 additions & 1 deletion h2o-py/tests/pyunit_utils/utilsPY.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
H2OGeneralizedAdditiveEstimator, H2OKMeansEstimator, H2ONaiveBayesEstimator, H2OInfogram, \
H2ORandomForestEstimator, H2OPrincipalComponentAnalysisEstimator
from h2o.utils.typechecks import is_type
from h2o.utils.shared_utils import temp_ctr # unused in this file but exposed here for symmetry with rest_ctr
from h2o.utils.shared_utils import can_use_pandas


class TemporaryDirectory:
Expand Down Expand Up @@ -4745,3 +4745,16 @@ def prepare_data():
y = 'Y'

return df, x, y

def can_install_datatable():
return sys.version_info.major == 3 and sys.version_info.minor <= 9

def can_install_polars():
return sys.version_info.major == 3 and sys.version_info.minor > 9

def can_install_pyarrow():
if can_use_pandas() and sys.version_info.minor > 9:
import pandas
return sys.version_info.major == 3 and float(pandas.__version__[0]) >= 1
else:
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import sys
sys.path.insert(1,"../../")
import h2o
from tests import pyunit_utils
from h2o.utils.shared_utils import (can_use_datatable, can_use_polars, can_use_pyarrow)
import time
import pandas as pd
from h2o.utils.threading import local_context

# if datatable or polars/pyarrow is installed, this test will show that using datatable to convert h2o frame to pandas
# frame is much faster for large datasets.
def test_frame_conversion(dataset, original_pandas_frame, module):
# convert frame using datatable or polar
h2oFrame = h2o.import_file(pyunit_utils.locate(dataset))
startT = time.time()
new_pandas_frame = h2oFrame.as_data_frame()
newTime = time.time()-startT
print("H2O frame to Pandas frame conversion time with multi-thread using module {1} for dataset {2}: {0}".format(newTime, module, dataset))
# compare two frames column types
new_types = new_pandas_frame.dtypes
old_types = original_pandas_frame.dtypes
ncol = h2oFrame.ncol
colNames = new_pandas_frame.columns

for ind in list(range(ncol)):
assert new_types[colNames[ind]] == old_types[colNames[ind]], "Expected column types: {0}, actual column types: " \
"{1}".format(old_types[colNames[ind]], new_types[colNames[ind]])
if new_types[colNames[ind]] == "object":
diff = new_pandas_frame[colNames[ind]] == original_pandas_frame[colNames[ind]]
if not diff.all(): # difference caused by the presence of NAs
newSeries = pd.Series(new_pandas_frame[colNames[ind]])
newNA = newSeries.isna()
oldSeries = pd.Series(original_pandas_frame[colNames[ind]])
oldNA = oldSeries.isna()
assert (newNA==oldNA).all()
else:
diff = (new_pandas_frame[colNames[ind]] - original_pandas_frame[colNames[ind]]).abs()
assert diff.max() < 1e-10

def singl_thread_pandas_conversion(dataset):
with local_context(datatable_disabled=True, polars_disabled=True):
print("converting h2o frame to pandas frame using single thread")
h2oFrame = h2o.import_file(pyunit_utils.locate(dataset))
startT = time.time()
h2oframe_panda = h2oFrame.as_data_frame()
newTime = time.time()-startT
print("H2O frame to Pandas frame conversion time with single thread for dataset {1}: {0}".format(newTime, dataset))
return h2oframe_panda

def test_polars_datatable():
file1 = "smalldata/titanic/titanic_expanded.csv"
file2 = "smalldata/glm_test/multinomial_3Class_10KRow.csv"
file3 = "smalldata/timeSeries/CreditCard-ts_train.csv"

if pyunit_utils.can_install_datatable() or (pyunit_utils.can_install_polars() and pyunit_utils.can_install_pyarrow()):
original_converted_frame1 = singl_thread_pandas_conversion(file1)
original_converted_frame2 = singl_thread_pandas_conversion(file2)
original_converted_frame3 = singl_thread_pandas_conversion(file3)

with local_context(polars_disabled=True): # run with datatable
if pyunit_utils.can_install_datatable():
if not(can_use_datatable()):
pyunit_utils.install("datatable")
wendycwong marked this conversation as resolved.
Show resolved Hide resolved
print("test data frame conversion using datatable.")
test_frame_conversion(file1, original_converted_frame1, "datatable")
test_frame_conversion(file2, original_converted_frame2, "datatable")
test_frame_conversion(file3, original_converted_frame3, "datatable")
else:
print("datatable is not available. Skipping tests using datatable.")

with local_context(datatable_disabled=True):
if pyunit_utils.can_install_polars() and pyunit_utils.can_install_pyarrow():
if not(can_use_polars()):
pyunit_utils.install('polars')
if not(can_use_pyarrow()):
pyunit_utils.install('pyarrow')
wendycwong marked this conversation as resolved.
Show resolved Hide resolved

print("test data frame conversion using polars and pyarrow.")
test_frame_conversion(file1, original_converted_frame1, "polars and pyarrow")
test_frame_conversion(file2, original_converted_frame2, "polars and pyarrow")
test_frame_conversion(file3, original_converted_frame3, "polars and pyarrow")
else:
print("polars, pyarrow are not available. Skipping tests using polars and pyarrow")
else:
print("datatable or polars and pyarrow are not available to test. Skipping tests using polars and pyarrow.")



if __name__ == "__main__":
pyunit_utils.standalone_test(test_polars_datatable)
else:
test_polars_datatable()
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import sys
sys.path.insert(1,"../../")
import h2o
from tests import pyunit_utils
from h2o.utils.shared_utils import (can_use_datatable, can_use_polars, can_use_pyarrow)
import time
from h2o.utils.threading import local_context

def test_frame_conversion(dataset, original_pandas_frame, module):
# convert frame using datatable or polar
h2oFrame = h2o.import_file(pyunit_utils.locate(dataset))
startT = time.time()
new_pandas_frame = h2oFrame.as_data_frame()
newTime = time.time()-startT
print("H2O frame to Pandas frame conversion time with multi-thread using module {1} for dataset {2}: {0}".format(newTime, module, dataset))
# compare two frames column types
new_types = new_pandas_frame.dtypes
old_types = original_pandas_frame.dtypes
ncol = h2oFrame.ncol
colNames = new_pandas_frame.columns

for ind in list(range(ncol)):
assert new_types[colNames[ind]] == old_types[colNames[ind]], "Expected column types: {0}, actual column types: " \
"{1}".format(old_types[colNames[ind]], new_types[colNames[ind]])
if new_types[colNames[ind]] == "object":
diff = new_pandas_frame[colNames[ind]] == original_pandas_frame[colNames[ind]]
if not diff.all(): # difference caused by the presence of NAs
newSeries = pd.Series(new_pandas_frame[colNames[ind]])
newNA = newSeries.isna()
oldSeries = pd.Series(original_pandas_frame[colNames[ind]])
oldNA = oldSeries.isna()
assert (newNA==oldNA).all()
else:
diff = (new_pandas_frame[colNames[ind]] - original_pandas_frame[colNames[ind]]).abs()
assert diff.max() < 1e-10

def single_thread_pandas_conversion(dataset):
with local_context(datatable_disabled=True, polars_disabled=True):
print("converting h2o frame to pandas frame using single thread")
h2oFrame = h2o.import_file(pyunit_utils.locate(dataset))
startT = time.time()
h2oframe_panda = h2oFrame.as_data_frame()
newTime = time.time()-startT
print("H2O frame to Pandas frame conversion time with single thread for dataset {1}: {0}".format(newTime, dataset))
return h2oframe_panda

# if datatable or polars/pyarrow is installed, this test will show that using datatable to convert h2o frame to pandas
# frame is much faster for large datasets.
def test_polars_datatable_2_pandas():
file1 = "bigdata/laptop/jira/PUBDEV_5266_merge_with_string_columns/PUBDEV_5266_f1.csv"
if pyunit_utils.can_install_datatable() or (pyunit_utils.can_install_polars() and pyunit_utils.can_install_pyarrow()):
original_converted_frame1 = single_thread_pandas_conversion(file1) # need to run conversion in single thread

with local_context(polars_disabled=True): # run with datatable
if pyunit_utils.can_install_datatable():
if not(can_use_datatable()):
pyunit_utils.install("datatable")
wendycwong marked this conversation as resolved.
Show resolved Hide resolved
print("test data frame conversion using datatable.")
test_frame_conversion(file1, original_converted_frame1, "datatable")
pyunit_utils.uninstall("datatable")
wendycwong marked this conversation as resolved.
Show resolved Hide resolved
else:
print("datatable is not available. Skipping tests using datatable.")

with local_context(datatable_disabled=True):
if pyunit_utils.can_install_polars() and pyunit_utils.can_install_pyarrow():
if not(can_use_polars()):
pyunit_utils.install('polars')

if not(can_use_pyarrow()):
pyunit_utils.install('pyarrow')
wendycwong marked this conversation as resolved.
Show resolved Hide resolved

print("test data frame conversion using polars and pyarrow.")
test_frame_conversion(file1, original_converted_frame1, "polars and pyarrow")
else:
print("polars, pyarrow are not available. Skipping tests using polars and pyarrow")
else:
print("datatable or polars and pyarrow are not available to test. Skipping tests using polars and pyarrow.")

if __name__ == "__main__":
pyunit_utils.standalone_test(test_polars_datatable_2_pandas)
else:
test_polars_datatable_2_pandas()
Loading