diff --git a/featuretools/computational_backends/api.py b/featuretools/computational_backends/api.py index 07c7e922bb..befa67a460 100644 --- a/featuretools/computational_backends/api.py +++ b/featuretools/computational_backends/api.py @@ -1,9 +1,12 @@ # flake8: noqa from .calculate_feature_matrix import ( approximate_features, + calculate_feature_matrix +) +from .pandas_backend import PandasBackend +from .utils import ( bin_cutoff_times, calc_num_per_chunk, - calculate_feature_matrix, + create_client_and_cluster, get_next_chunk ) -from .pandas_backend import PandasBackend diff --git a/featuretools/computational_backends/calculate_feature_matrix.py b/featuretools/computational_backends/calculate_feature_matrix.py index 1f607bba00..12fec9a21f 100644 --- a/featuretools/computational_backends/calculate_feature_matrix.py +++ b/featuretools/computational_backends/calculate_feature_matrix.py @@ -5,27 +5,28 @@ import os import shutil import time -import warnings from builtins import zip from collections import defaultdict from datetime import datetime -from functools import wraps import cloudpickle import numpy as np import pandas as pd -import psutil -from pandas.tseries.frequencies import to_offset from .pandas_backend import PandasBackend - -from featuretools.primitives import ( - AggregationPrimitive, - DirectFeature, - PrimitiveBase +from .utils import ( + bin_cutoff_times, + calc_num_per_chunk, + create_client_and_cluster, + gather_approximate_features, + gen_empty_approx_features_df, + get_next_chunk, + save_csv_decorator ) + +from featuretools.primitives import AggregationPrimitive, PrimitiveBase from featuretools.utils.gen_utils import make_tqdm_iterator -from featuretools.utils.wrangle import _check_time_type, _check_timedelta +from featuretools.utils.wrangle import _check_time_type from featuretools.variable_types import DatetimeTimeIndex, NumericTimeIndex logger = logging.getLogger('featuretools.computational_backend') @@ -366,38 +367,6 @@ def calc_results(time_last, ids, precalculated_features=None, training_window=No return feature_matrix -def bin_cutoff_times(cuttoff_time, bin_size): - binned_cutoff_time = cuttoff_time.copy() - if type(bin_size) == int: - binned_cutoff_time['time'] = binned_cutoff_time['time'].apply(lambda x: x / bin_size * bin_size) - else: - bin_size = _check_timedelta(bin_size).get_pandas_timedelta() - binned_cutoff_time['time'] = datetime_round(binned_cutoff_time['time'], bin_size) - return binned_cutoff_time - - -def save_csv_decorator(save_progress=None): - def inner_decorator(method): - @wraps(method) - def wrapped(*args, **kwargs): - if save_progress is None: - r = method(*args, **kwargs) - else: - time = args[0].to_pydatetime() - file_name = 'ft_' + time.strftime("%Y_%m_%d_%I-%M-%S-%f") + '.csv' - file_path = os.path.join(save_progress, file_name) - temp_dir = os.path.join(save_progress, 'temp') - if not os.path.exists(temp_dir): - os.makedirs(temp_dir) - temp_file_path = os.path.join(temp_dir, file_name) - r = method(*args, **kwargs) - r.to_csv(temp_file_path) - os.rename(temp_file_path, file_path) - return r - return wrapped - return inner_decorator - - def approximate_features(features, cutoff_time, window, entityset, backend, training_window=None, profile=None): '''Given a list of features and cutoff_times to be passed to @@ -518,141 +487,6 @@ def approximate_features(features, cutoff_time, window, entityset, backend, return approx_fms_by_entity, all_approx_feature_set -def datetime_round(dt, freq, round_up=False): - """ - Taken from comments on the Pandas source: https://github.com/pandas-dev/pandas/issues/4314 - - round down Timestamp series to a specified freq - """ - if round_up: - round_f = np.ceil - else: - round_f = np.floor - dt = pd.DatetimeIndex(dt) - freq = to_offset(freq).delta.value - return pd.DatetimeIndex(((round_f(dt.asi8 / freq)) * freq).astype(np.int64)) - - -def gather_approximate_features(features, backend): - approximate_by_entity = defaultdict(list) - approximate_feature_set = set([]) - for feature in features: - if backend.feature_tree.uses_full_entity(feature): - continue - if isinstance(feature, DirectFeature): - base_feature = feature.base_features[0] - while not isinstance(base_feature, AggregationPrimitive): - if isinstance(base_feature, DirectFeature): - base_feature = base_feature.base_features[0] - else: - break - if isinstance(base_feature, AggregationPrimitive): - approx_entity = base_feature.entity.id - approximate_by_entity[approx_entity].append(base_feature) - approximate_feature_set.add(base_feature.hash()) - return approximate_by_entity, approximate_feature_set - - -def gen_empty_approx_features_df(approx_features): - approx_entity_id = approx_features[0].entity.id - df = pd.DataFrame(columns=[f.get_name() for f in approx_features]) - df.index.name = approx_features[0].entity.index - approx_fms_by_entity = {approx_entity_id: df} - return approx_fms_by_entity - - -def calc_num_per_chunk(chunk_size, shape): - """ - Given a chunk size and the shape of the feature matrix to split into - chunk, returns the number of rows there should be per chunk - """ - if isinstance(chunk_size, float) and chunk_size > 0 and chunk_size < 1: - num_per_chunk = int(shape[0] * float(chunk_size)) - # must be at least 1 cutoff per chunk - num_per_chunk = max(1, num_per_chunk) - elif isinstance(chunk_size, int) and chunk_size >= 1: - if chunk_size > shape[0]: - warnings.warn("Chunk size is greater than size of feature matrix") - num_per_chunk = shape[0] - else: - num_per_chunk = chunk_size - elif chunk_size is None: - num_per_chunk = max(int(shape[0] * .1), 10) - elif chunk_size == "cutoff time": - num_per_chunk = "cutoff time" - else: - raise ValueError("chunk_size must be None, a float between 0 and 1," - "a positive integer, or the string 'cutoff time'") - return num_per_chunk - - -def get_next_chunk(cutoff_time, time_variable, num_per_chunk): - """ - Generator function that takes a DataFrame of cutoff times and the number of - rows to include per chunk and returns an iterator of the resulting chunks. - - Args: - cutoff_time (pd.DataFrame): dataframe of cutoff times to chunk - time_variable (str): name of time column in cutoff_time dataframe - num_per_chunk (int): maximum number of rows to include in a chunk - """ - # if chunk_size is 100%, return DataFrame immediately and stop iteration - if cutoff_time.shape[0] <= num_per_chunk: - yield cutoff_time - return - - # split rows of cutoff_time into groups based on time variable - grouped = cutoff_time.groupby(time_variable, sort=False) - - # sort groups by size, largest first - groups = grouped.size().sort_values(ascending=False).index - - # list of partially filled chunks - chunks = [] - - # iterate through each group and try to make completely filled chunks - for group_name in groups: - # get locations in cutoff_time (iloc) of all rows in group - group = grouped.groups[group_name].values.tolist() - - # divide up group into slices if too large to fit in a single chunk - group_slices = [] - if len(group) > num_per_chunk: - for i in range(0, len(group), num_per_chunk): - group_slices.append(group[i: i + num_per_chunk]) - else: - group_slices.append(group) - - # for each slice of the group, try to find a chunk it can fit in - for group_slice in group_slices: - # if slice is exactly the number of rows for a chunk, yield the - # slice's rows of cutoff_time as the next chunk and move on - if len(group_slice) == num_per_chunk: - yield cutoff_time.loc[group_slice] - continue - - # if not, look for partially filled chunks that have room - found_chunk = False - for i in range(len(chunks)): - chunk = chunks[i] - if len(chunk) + len(group_slice) <= num_per_chunk: - chunk.extend(group_slice) - found_chunk = True - if len(chunk) == num_per_chunk: - # if chunk is full, pop from partial list and yield - loc_list = chunks.pop(i) - yield cutoff_time.loc[loc_list] - break - - # if no chunk has room, this slice becomes another partial chunk - if not found_chunk: - chunks.append(group_slice) - - # after iterating through every group, yield any remaining partial chunks - for chunk in chunks: - yield cutoff_time.loc[chunk] - - def linear_calculate_chunks(chunks, features, approximate, training_window, profile, verbose, save_progress, entityset, no_unapproximated_aggs, cutoff_df_time_var, @@ -691,35 +525,15 @@ def parallel_calculate_chunks(chunks, features, approximate, training_window, verbose, save_progress, entityset, n_jobs, no_unapproximated_aggs, cutoff_df_time_var, target_time, pass_columns, dask_kwargs=None): - from distributed import Client, LocalCluster, as_completed + from distributed import as_completed from dask.base import tokenize client = None cluster = None try: - if 'cluster' in dask_kwargs: - cluster = dask_kwargs['cluster'] - else: - diagnostics_port = None - if 'diagnostics_port' in dask_kwargs: - diagnostics_port = dask_kwargs['diagnostics_port'] - del dask_kwargs['diagnostics_port'] - - workers = n_jobs_to_workers(n_jobs) - workers = min(workers, len(chunks)) - cluster = LocalCluster(n_workers=workers, - threads_per_worker=1, - diagnostics_port=diagnostics_port, - **dask_kwargs) - # if cluster has bokeh port, notify user if unxepected port number - if diagnostics_port is not None: - if hasattr(cluster, 'scheduler') and cluster.scheduler: - info = cluster.scheduler.identity() - if 'bokeh' in info['services']: - msg = "Dashboard started on port {}" - print(msg.format(info['services']['bokeh'])) - - client = Client(cluster) + client, cluster = create_client_and_cluster(n_jobs=n_jobs, + num_tasks=len(chunks), + dask_kwargs=dask_kwargs) # scatter the entityset # denote future with leading underscore start = time.time() @@ -780,18 +594,3 @@ def parallel_calculate_chunks(chunks, features, approximate, training_window, client.close() return feature_matrix - - -def n_jobs_to_workers(n_jobs): - try: - cpus = len(psutil.Process().cpu_affinity()) - except AttributeError: - cpus = psutil.cpu_count() - - if n_jobs < 0: - workers = max(cpus + 1 + n_jobs, 1) - else: - workers = min(n_jobs, cpus) - - assert workers > 0, "Need at least one worker" - return workers diff --git a/featuretools/computational_backends/utils.py b/featuretools/computational_backends/utils.py new file mode 100644 index 0000000000..b0aaf81b53 --- /dev/null +++ b/featuretools/computational_backends/utils.py @@ -0,0 +1,225 @@ +import os +import warnings +from collections import defaultdict +from functools import wraps + +import numpy as np +import pandas as pd +import psutil +from distributed import Client, LocalCluster +from pandas.tseries.frequencies import to_offset + +from featuretools.primitives import AggregationPrimitive, DirectFeature +from featuretools.utils.wrangle import _check_timedelta + + +def bin_cutoff_times(cuttoff_time, bin_size): + binned_cutoff_time = cuttoff_time.copy() + if type(bin_size) == int: + binned_cutoff_time['time'] = binned_cutoff_time['time'].apply(lambda x: x / bin_size * bin_size) + else: + bin_size = _check_timedelta(bin_size).get_pandas_timedelta() + binned_cutoff_time['time'] = datetime_round(binned_cutoff_time['time'], bin_size) + return binned_cutoff_time + + +def save_csv_decorator(save_progress=None): + def inner_decorator(method): + @wraps(method) + def wrapped(*args, **kwargs): + if save_progress is None: + r = method(*args, **kwargs) + else: + time = args[0].to_pydatetime() + file_name = 'ft_' + time.strftime("%Y_%m_%d_%I-%M-%S-%f") + '.csv' + file_path = os.path.join(save_progress, file_name) + temp_dir = os.path.join(save_progress, 'temp') + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + temp_file_path = os.path.join(temp_dir, file_name) + r = method(*args, **kwargs) + r.to_csv(temp_file_path) + os.rename(temp_file_path, file_path) + return r + return wrapped + return inner_decorator + + +def datetime_round(dt, freq, round_up=False): + """ + Taken from comments on the Pandas source: https://github.com/pandas-dev/pandas/issues/4314 + + round down Timestamp series to a specified freq + """ + if round_up: + round_f = np.ceil + else: + round_f = np.floor + dt = pd.DatetimeIndex(dt) + freq = to_offset(freq).delta.value + return pd.DatetimeIndex(((round_f(dt.asi8 / freq)) * freq).astype(np.int64)) + + +def gather_approximate_features(features, backend): + approximate_by_entity = defaultdict(list) + approximate_feature_set = set([]) + for feature in features: + if backend.feature_tree.uses_full_entity(feature): + continue + if isinstance(feature, DirectFeature): + base_feature = feature.base_features[0] + while not isinstance(base_feature, AggregationPrimitive): + if isinstance(base_feature, DirectFeature): + base_feature = base_feature.base_features[0] + else: + break + if isinstance(base_feature, AggregationPrimitive): + approx_entity = base_feature.entity.id + approximate_by_entity[approx_entity].append(base_feature) + approximate_feature_set.add(base_feature.hash()) + return approximate_by_entity, approximate_feature_set + + +def gen_empty_approx_features_df(approx_features): + approx_entity_id = approx_features[0].entity.id + df = pd.DataFrame(columns=[f.get_name() for f in approx_features]) + df.index.name = approx_features[0].entity.index + approx_fms_by_entity = {approx_entity_id: df} + return approx_fms_by_entity + + +def calc_num_per_chunk(chunk_size, shape): + """ + Given a chunk size and the shape of the feature matrix to split into + chunk, returns the number of rows there should be per chunk + """ + if isinstance(chunk_size, float) and chunk_size > 0 and chunk_size < 1: + num_per_chunk = int(shape[0] * float(chunk_size)) + # must be at least 1 cutoff per chunk + num_per_chunk = max(1, num_per_chunk) + elif isinstance(chunk_size, int) and chunk_size >= 1: + if chunk_size > shape[0]: + warnings.warn("Chunk size is greater than size of feature matrix") + num_per_chunk = shape[0] + else: + num_per_chunk = chunk_size + elif chunk_size is None: + num_per_chunk = max(int(shape[0] * .1), 10) + elif chunk_size == "cutoff time": + num_per_chunk = "cutoff time" + else: + raise ValueError("chunk_size must be None, a float between 0 and 1," + "a positive integer, or the string 'cutoff time'") + return num_per_chunk + + +def get_next_chunk(cutoff_time, time_variable, num_per_chunk): + """ + Generator function that takes a DataFrame of cutoff times and the number of + rows to include per chunk and returns an iterator of the resulting chunks. + + Args: + cutoff_time (pd.DataFrame): dataframe of cutoff times to chunk + time_variable (str): name of time column in cutoff_time dataframe + num_per_chunk (int): maximum number of rows to include in a chunk + """ + # if chunk_size is 100%, return DataFrame immediately and stop iteration + if cutoff_time.shape[0] <= num_per_chunk: + yield cutoff_time + return + + # split rows of cutoff_time into groups based on time variable + grouped = cutoff_time.groupby(time_variable, sort=False) + + # sort groups by size, largest first + groups = grouped.size().sort_values(ascending=False).index + + # list of partially filled chunks + chunks = [] + + # iterate through each group and try to make completely filled chunks + for group_name in groups: + # get locations in cutoff_time (iloc) of all rows in group + group = grouped.groups[group_name].values.tolist() + + # divide up group into slices if too large to fit in a single chunk + group_slices = [] + if len(group) > num_per_chunk: + for i in range(0, len(group), num_per_chunk): + group_slices.append(group[i: i + num_per_chunk]) + else: + group_slices.append(group) + + # for each slice of the group, try to find a chunk it can fit in + for group_slice in group_slices: + # if slice is exactly the number of rows for a chunk, yield the + # slice's rows of cutoff_time as the next chunk and move on + if len(group_slice) == num_per_chunk: + yield cutoff_time.loc[group_slice] + continue + + # if not, look for partially filled chunks that have room + found_chunk = False + for i in range(len(chunks)): + chunk = chunks[i] + if len(chunk) + len(group_slice) <= num_per_chunk: + chunk.extend(group_slice) + found_chunk = True + if len(chunk) == num_per_chunk: + # if chunk is full, pop from partial list and yield + loc_list = chunks.pop(i) + yield cutoff_time.loc[loc_list] + break + + # if no chunk has room, this slice becomes another partial chunk + if not found_chunk: + chunks.append(group_slice) + + # after iterating through every group, yield any remaining partial chunks + for chunk in chunks: + yield cutoff_time.loc[chunk] + + +def n_jobs_to_workers(n_jobs): + try: + cpus = len(psutil.Process().cpu_affinity()) + except AttributeError: + cpus = psutil.cpu_count() + + # Taken from sklearn parallel_backends code + # https://github.com/scikit-learn/scikit-learn/blob/27bbdb570bac062c71b3bb21b0876fd78adc9f7e/sklearn/externals/joblib/_parallel_backends.py#L120 + if n_jobs < 0: + workers = max(cpus + 1 + n_jobs, 1) + else: + workers = min(n_jobs, cpus) + + assert workers > 0, "Need at least one worker" + return workers + + +def create_client_and_cluster(n_jobs, num_tasks, dask_kwargs): + cluster = None + if 'cluster' in dask_kwargs: + cluster = dask_kwargs['cluster'] + else: + diagnostics_port = None + if 'diagnostics_port' in dask_kwargs: + diagnostics_port = dask_kwargs['diagnostics_port'] + del dask_kwargs['diagnostics_port'] + + workers = n_jobs_to_workers(n_jobs) + workers = min(workers, num_tasks) + cluster = LocalCluster(n_workers=workers, + threads_per_worker=1, + diagnostics_port=diagnostics_port, + **dask_kwargs) + # if cluster has bokeh port, notify user if unxepected port number + if diagnostics_port is not None: + if hasattr(cluster, 'scheduler') and cluster.scheduler: + info = cluster.scheduler.identity() + if 'bokeh' in info['services']: + msg = "Dashboard started on port {}" + print(msg.format(info['services']['bokeh'])) + + client = Client(cluster) + return client, cluster diff --git a/featuretools/tests/computational_backend/test_calculate_feature_matrix.py b/featuretools/tests/computational_backend/test_calculate_feature_matrix.py index 30033efde3..958f2d5266 100644 --- a/featuretools/tests/computational_backend/test_calculate_feature_matrix.py +++ b/featuretools/tests/computational_backend/test_calculate_feature_matrix.py @@ -18,9 +18,10 @@ from ..testing_utils import make_ecommerce_entityset from featuretools import EntitySet, Timedelta, calculate_feature_matrix, dfs -from featuretools.computational_backends.calculate_feature_matrix import ( +from featuretools.computational_backends.utils import ( bin_cutoff_times, calc_num_per_chunk, + create_client_and_cluster, get_next_chunk, n_jobs_to_workers ) @@ -837,6 +838,32 @@ def test_dask_persisted_entityset(entityset, capsys): assert (feature_matrix == labels).values.all() +def test_create_client_and_cluster(entityset, monkeypatch): + def test_cluster(n_workers=1, + threads_per_worker=1, + diagnostics_port=8787, + **dask_kwarg): + return (n_workers, threads_per_worker, diagnostics_port) + monkeypatch.setitem(create_client_and_cluster.__globals__, 'LocalCluster', test_cluster) + monkeypatch.setitem(create_client_and_cluster.__globals__, 'Client', lambda x: x) + + # cluster in dask_kwargs case + client, cluster = create_client_and_cluster(n_jobs=2, + num_tasks=3, + dask_kwargs={'cluster': 'tcp://127.0.0.1:54321'}) + assert cluster == 'tcp://127.0.0.1:54321' + # jobs < tasks case + client, cluster = create_client_and_cluster(n_jobs=2, + num_tasks=3, + dask_kwargs={}) + assert cluster == (2, 1, None) + # jobs > tasks case + client, cluster = create_client_and_cluster(n_jobs=10, + num_tasks=3, + dask_kwargs={'diagnostics_port': 8789}) + assert cluster == (3, 1, 8789) + + def test_parallel_failure_raises_correct_error(entityset): times = list([datetime(2011, 4, 9, 10, 30, i * 6) for i in range(5)] + [datetime(2011, 4, 9, 10, 31, i * 9) for i in range(4)] +