Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-15887: allow Py H2OFrame constructor to accept an existing H2OFrame #15898

Merged
merged 5 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 96 additions & 18 deletions h2o-py/h2o/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,26 @@
import datetime
import functools
from io import StringIO
import itertools
import os
import sys
import re
import tempfile
import traceback
from types import FunctionType
import warnings

import h2o
from h2o.base import Keyed
from h2o.display import H2ODisplay, H2ODisplayWrapper, H2OItemsDisplay, H2OTableDisplay, display, in_ipy, in_zep, repr_def
from h2o.display import H2ODisplay, H2ODisplayWrapper, H2OItemsDisplay, H2OTableDisplay, display, repr_def
from h2o.exceptions import H2OTypeError, H2OValueError, H2ODeprecationWarning
from h2o.expr import ExprNode
from h2o.group_by import GroupBy
from h2o.job import H2OJob
from h2o.plot import get_matplotlib_pyplot, decorate_plot_result, RAISE_ON_FIGURE_ACCESS
from h2o.utils.config import get_config_value
from h2o.utils.metaclass import deprecated_fn
from h2o.utils.shared_utils import (_handle_numpy_array, _handle_pandas_data_frame, _handle_python_dicts,
_handle_python_lists, _is_list, _is_str_list, _py_tmp_key, _quoted,
can_use_pandas, can_use_numpy, quote, normalize_slice, slice_is_normalized,
check_frame_id, can_use_datatable)
from h2o.utils.shared_utils import(gen_header, is_list, is_list_of_lists, is_str_list, py_tmp_key, quoted,
can_use_pandas, can_use_numpy, quote, normalize_slice, slice_is_normalized,
check_frame_id, can_use_datatable)
from h2o.utils.threading import local_context, local_env
from h2o.utils.typechecks import (assert_is_type, assert_satisfies, Enum, I, is_type, numeric, numpy_ndarray,
numpy_datetime, pandas_dataframe, pandas_timestamp, scipy_sparse, U)
Expand Down Expand Up @@ -60,6 +59,7 @@ class H2OFrame(Keyed, H2ODisplay):
- A Pandas dataframe, or a Numpy ndarray: create a matching H2OFrame.
- A Scipy sparse matrix: create a matching sparse H2OFrame.

:param str destination_frame: (internal) name of the target DKV key in the H2O backend.
:param int header: if ``python_obj`` is a list of lists, this parameter can be used to indicate whether the
first row of the data represents headers. The value of -1 means the first row is data, +1 means the first
row is the headers, 0 (default) allows H2O to guess whether the first row contains data or headers.
Expand All @@ -71,7 +71,6 @@ class H2OFrame(Keyed, H2ODisplay):
types for only few columns, and let H2O choose the types of the rest.
:param na_strings: List of strings in the input data that should be interpreted as missing values. This could
be given on a per-column basis, either as a list-of-lists, or as a dictionary {column name: list of nas}.
:param str destination_frame: (internal) name of the target DKV key in the H2O backend.
:param str separator: (deprecated)

:example:
Expand All @@ -94,7 +93,7 @@ def __init__(self, python_obj=None, destination_frame=None, header=0, separator=

coltype = U(None, "unknown", "uuid", "string", "float", "real", "double", "int", "long", "numeric",
"categorical", "factor", "enum", "time")
assert_is_type(python_obj, None, list, tuple, dict, numpy_ndarray, pandas_dataframe, scipy_sparse)
assert_is_type(python_obj, None, list, tuple, dict, numpy_ndarray, pandas_dataframe, scipy_sparse, H2OFrame)
assert_is_type(destination_frame, None, str)
assert_is_type(header, -1, 0, 1)
assert_is_type(separator, I(str, lambda s: len(s) == 1))
Expand All @@ -103,12 +102,23 @@ def __init__(self, python_obj=None, destination_frame=None, header=0, separator=
assert_is_type(na_strings, None, [str], [[str]], {str: [str]})
check_frame_id(destination_frame)

self._ex = ExprNode()
self._ex._children = None
self._is_frame = True # Indicate that this is an actual frame, allowing typechecks to be made
if python_obj is not None:
self._upload_python_object(python_obj, destination_frame, header, separator,
column_names, column_types, na_strings, skipped_columns, force_col_types)
if isinstance(python_obj, H2OFrame):
sc = h2o.h2o.shallow_copy(python_obj, destination_frame)
if skipped_columns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sebhrusen : I am confused here. The user want to convert a panda frame to H2O frame. If the frame is already H2O frame, should it be easier just to return it? I was thinking this is just a simple check for as_data_frame method....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the user is using a constructor like H2OFrame, I don't think it's a good idea to return the exact same frame. That's why I return a shallow copy instead: it's cheap (no data copy) but it's not the same object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You worried about user changing the original frame.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You worried about user changing the original frame.

it's one concern, but not the major one.
I'm more concerned about user expectations: H2OFrame(…) is a constructor, not a factory, and therefore I think it should always return a fresh instance.
It could be implemented as a factory but it's not:

  • if you call H2OFrame([1, 2, 3]) twice, you get 2 different objects.
  • if you call H2OFrame(h2o_fr, skipped_columns=['foo', 'bar']), you expect a filtered version of the given frame and it will be a new object.
  • if you call H2OFrame(h2o_fr, skipped_columns=['do_not_exist']), you expect the same as above, but as the columns don't exist, you will get a new copy of the given frame.
  • therefore, if you just call H2OFrame(h2o_fr) it's more natural to still return a new instance, and therefore a copy (shallow because there's no reason to deep clone here). If we were returning the given frame directly here, it would be an unnecessary special case that may confuse the user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wendycwong

I am confused here. The user want to convert a panda frame to H2O frame

well, that's what he wants to do when passing a panda frame, but who knows what he wants to do when passing a H2OFrame? It's an edge case.

Maybe to convince you, here is how pandas itself behaves:

In [24]: import pandas as pd

In [25]: df = pd.DataFrame([1, 2, 3])

In [26]: id(df)
Out[26]: 5090919696

In [27]: df2 = pd.DataFrame([1, 2, 3])

In [28]: id(df2)
Out[28]: 5061990032

In [29]: df3 = pd.DataFrame(df)

In [30]: id(df3)
Out[30]: 5043948112

you see: df3 != df

sc = sc.drop(skipped_columns)
if column_names:
sc.set_names(column_names)
if destination_frame is not None and destination_frame != sc.key:
h2o.assign(sc, destination_frame)
self._ex = sc._ex

else:
self._ex = ExprNode()
self._ex._children = None
if python_obj is not None:
self._upload_python_object(python_obj, destination_frame, header, separator,
column_names, column_types, na_strings, skipped_columns, force_col_types)

@staticmethod
def _expr(expr, cache=None):
Expand Down Expand Up @@ -160,7 +170,7 @@ def _upload_sparse_matrix(self, matrix, destination_frame=None):
tmp_handle, tmp_path = tempfile.mkstemp(suffix=".svmlight")
out = os.fdopen(tmp_handle, 'wt', **H2OFrame.__fdopen_kwargs)
if destination_frame is None:
destination_frame = _py_tmp_key(h2o.connection().session_id)
destination_frame = py_tmp_key(h2o.connection().session_id)

# sp.find(matrix) returns (row indices, column indices, values) of the non-zero elements of A. Unfortunately
# there is no guarantee that those elements are returned in the correct order, so need to sort
Expand Down Expand Up @@ -490,7 +500,7 @@ def _parse_raw(self, setup):
p.update({k: v for k, v in setup.items() if k in p})

# Extract only 'name' from each src in the array of srcs
p['source_frames'] = [_quoted(src['name']) for src in setup['source_frames']]
p['source_frames'] = [quoted(src['name']) for src in setup['source_frames']]

H2OJob(h2o.api("POST /3/Parse", data=p), "Parse").poll()
# Need to return a Frame here for nearly all callers
Expand Down Expand Up @@ -706,7 +716,7 @@ def __pow__(self, rhs):
return _binop(self, "^", rhs)

def __contains__(self, lhs):
return all((t == self).any() for t in lhs) if _is_list(lhs) else (lhs == self).any()
return all((t == self).any() for t in lhs) if is_list(lhs) else (lhs == self).any()

# rops
def __rmod__(self, lhs):
Expand Down Expand Up @@ -2164,7 +2174,7 @@ def _compute_ncol_update(self, item): # computes new ncol, names, and types
new_ncols = -1
if isinstance(item, list):
new_ncols = len(item)
if _is_str_list(item):
if is_str_list(item):
new_types = {k: self.types[k] for k in item}
new_names = item
else:
Expand Down Expand Up @@ -5182,3 +5192,71 @@ def generatePandaEnumCols(pandaFtrain, cname, nrows, domainL):
ftemp = temp[newNames]
ctemp = pd.concat([ftemp, zeroFrame], axis=1)
return ctemp


### Module-scope utility functions ###

def _handle_python_lists(python_obj, check_header):
# convert all inputs to lol
if is_list_of_lists(python_obj): # do we have a list of lists: [[...], ..., [...]] ?
ncols = _check_lists_of_lists(python_obj) # must be a list of flat lists, raise ValueError if not
elif isinstance(python_obj, (list, tuple)): # single list
ncols = 1
python_obj = [[e] for e in python_obj]
else: # scalar
python_obj = [[python_obj]]
ncols = 1
# create the header
if check_header == 1:
header = python_obj[0]
python_obj = python_obj[1:]
else:
header = gen_header(ncols)
# shape up the data for csv.DictWriter
# data_to_write = [dict(list(zip(header, row))) for row in python_obj]
return header, python_obj


def _handle_python_dicts(python_obj, check_header):
header = list(python_obj.keys()) if python_obj else gen_header(1)
is_valid = all(re.match(r"^[a-zA-Z_][a-zA-Z0-9_.]*$", col) for col in header) # is this a valid header?
if not is_valid:
raise ValueError(
"Did not get a valid set of column names! Must match the regular expression: ^[a-zA-Z_][a-zA-Z0-9_.]*$ ")
for k in python_obj: # check that each value entry is a flat list/tuple or single int, float, or string
v = python_obj[k]
if isinstance(v, (tuple, list)): # if value is a tuple/list, then it must be flat
if is_list_of_lists(v):
raise ValueError("Values in the dictionary must be flattened!")
elif is_type(v, str, numeric):
python_obj[k] = [v]
else:
raise ValueError("Encountered invalid dictionary value when constructing H2OFrame. Got: {0}".format(v))

zipper = getattr(itertools, "zip_longest", None) or getattr(itertools, "izip_longest", None) or zip
rows = list(map(list, zipper(*list(python_obj.values()))))
data_to_write = [dict(list(zip(header, row))) for row in rows]
return header, data_to_write

def _handle_numpy_array(python_obj, header):
return _handle_python_lists(python_obj.tolist(), header)

def _handle_pandas_data_frame(python_obj, header):
data = _handle_python_lists(python_obj.values.tolist(), -1)[1]
return list(str(c) for c in python_obj.columns), data

def _check_lists_of_lists(python_obj):
# check we have a lists of flat lists
# returns longest length of sublist
most_cols = 1
for l in python_obj:
# All items in the list must be a list!
if not isinstance(l, (tuple, list)):
raise ValueError("`python_obj` is a mixture of nested lists and other types.")
most_cols = max(most_cols, len(l))
for ll in l:
# in fact, we must have a list of flat lists!
if isinstance(ll, (tuple, list)):
raise ValueError("`python_obj` is not a list of flat lists!")
return most_cols

17 changes: 13 additions & 4 deletions h2o-py/h2o/h2o.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,8 @@ def parse_setup(raw_frames, destination_frame=None, header=0, separator=None, co
if is_type(raw_frames, str): raw_frames = [raw_frames]

# temporary dictionary just to pass the following information to the parser: header, separator
kwargs = {"check_header": header, "source_frames": [quoted(frame_id) for frame_id in raw_frames],
kwargs = {"check_header": header,
"source_frames": [quoted(frame_id) for frame_id in raw_frames],
"single_quotes": quotechar == "'"}
if separator:
kwargs["separator"] = ord(separator)
Expand All @@ -844,6 +845,7 @@ def parse_setup(raw_frames, destination_frame=None, header=0, separator=None, co

if custom_non_data_line_markers is not None:
kwargs["custom_non_data_line_markers"] = custom_non_data_line_markers

if partition_by is not None:
kwargs["partition_by"] = partition_by

Expand Down Expand Up @@ -1034,9 +1036,16 @@ def deep_copy(data, xid):
assert_satisfies(xid, xid != data.frame_id)
check_frame_id(xid)
duplicate = data.apply(lambda x: x)
duplicate._ex = ExprNode("assign", xid, duplicate)._eval_driver(None)
duplicate._ex._cache._id = xid
duplicate._ex._children = None
assign(duplicate, xid)
return duplicate

def shallow_copy(data, xid):
assert_is_type(data, H2OFrame)
assert_is_type(xid, None, str)
duplicate = H2OFrame._expr(expr=ExprNode("cols_py", data, slice(data.ncols)))
if xid is not None:
assert_satisfies(xid, xid != data.frame_id)
assign(duplicate, xid)
return duplicate


Expand Down
74 changes: 0 additions & 74 deletions h2o-py/h2o/utils/shared_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,43 +162,6 @@ def _gen_header(cols):
return ["C" + str(c) for c in range(1, cols + 1, 1)]


def _check_lists_of_lists(python_obj):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just moved those functions with import business logic to frame.py where it should be.
shared_utils should ideally only contain simple utility functions can be used anywhere, nothing too specific to a business module, otherwise it's messy.

# check we have a lists of flat lists
# returns longest length of sublist
most_cols = 1
for l in python_obj:
# All items in the list must be a list!
if not isinstance(l, (tuple, list)):
raise ValueError("`python_obj` is a mixture of nested lists and other types.")
most_cols = max(most_cols, len(l))
for ll in l:
# in fact, we must have a list of flat lists!
if isinstance(ll, (tuple, list)):
raise ValueError("`python_obj` is not a list of flat lists!")
return most_cols


def _handle_python_lists(python_obj, check_header):
# convert all inputs to lol
if _is_list_of_lists(python_obj): # do we have a list of lists: [[...], ..., [...]] ?
ncols = _check_lists_of_lists(python_obj) # must be a list of flat lists, raise ValueError if not
elif isinstance(python_obj, (list, tuple)): # single list
ncols = 1
python_obj = [[e] for e in python_obj]
else: # scalar
python_obj = [[python_obj]]
ncols = 1
# create the header
if check_header == 1:
header = python_obj[0]
python_obj = python_obj[1:]
else:
header = _gen_header(ncols)
# shape up the data for csv.DictWriter
# data_to_write = [dict(list(zip(header, row))) for row in python_obj]
return header, python_obj


def stringify_dict(d):
return stringify_list(["{'key': %s, 'value': %s}" % (_quoted(k), v) for k, v in d.items()])

Expand Down Expand Up @@ -244,38 +207,6 @@ def _is_num_list(l):
def _is_list_of_lists(o):
return any(isinstance(l, (tuple, list)) for l in o)


def _handle_numpy_array(python_obj, header):
return _handle_python_lists(python_obj.tolist(), header)


def _handle_pandas_data_frame(python_obj, header):
data = _handle_python_lists(python_obj.values.tolist(), -1)[1]
return list(str(c) for c in python_obj.columns), data


def _handle_python_dicts(python_obj, check_header):
header = list(python_obj.keys()) if python_obj else _gen_header(1)
is_valid = all(re.match(r"^[a-zA-Z_][a-zA-Z0-9_.]*$", col) for col in header) # is this a valid header?
if not is_valid:
raise ValueError(
"Did not get a valid set of column names! Must match the regular expression: ^[a-zA-Z_][a-zA-Z0-9_.]*$ ")
for k in python_obj: # check that each value entry is a flat list/tuple or single int, float, or string
v = python_obj[k]
if isinstance(v, (tuple, list)): # if value is a tuple/list, then it must be flat
if _is_list_of_lists(v):
raise ValueError("Values in the dictionary must be flattened!")
elif is_type(v, str, numeric):
python_obj[k] = [v]
else:
raise ValueError("Encountered invalid dictionary value when constructing H2OFrame. Got: {0}".format(v))

zipper = getattr(itertools, "zip_longest", None) or getattr(itertools, "izip_longest", None) or zip
rows = list(map(list, zipper(*list(python_obj.values()))))
data_to_write = [dict(list(zip(header, row))) for row in rows]
return header, data_to_write


def _is_fr(o):
return o.__class__.__name__ == "H2OFrame" # hack to avoid circular imports

Expand Down Expand Up @@ -426,14 +357,9 @@ def slice_is_normalized(s):
quoted = _quoted
is_list = _is_list
is_fr = _is_fr
handle_python_dicts = _handle_python_dicts
handle_pandas_data_frame = _handle_pandas_data_frame
handle_numpy_array = _handle_numpy_array
is_list_of_lists = _is_list_of_lists
is_num_list = _is_num_list
is_str_list = _is_str_list
handle_python_lists = _handle_python_lists
check_lists_of_lists = _check_lists_of_lists

gen_model_file_name = "h2o-genmodel.jar"
h2o_predictor_class = "hex.genmodel.tools.PredictCsv"
Expand Down
Loading