diff --git a/examples/basic_example_pyHDK.ipynb b/examples/basic_example_pyHDK.ipynb new file mode 100644 index 000000000..9aea533d0 --- /dev/null +++ b/examples/basic_example_pyHDK.ipynb @@ -0,0 +1,209 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import pyarrow as pa\n", + "import pyhdk\n", + "import time\n", + "\n", + "hdk = pyhdk.hdk.HDK(\n", + " debug_logs=\"INFO\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Init data\n", + "col2_tbl1 = np.array(['red', 'orange', 'yellow', 'green', 'blue'])\n", + "col1_tbl1 = np.arange(len(col2_tbl1))\n", + "\n", + "table1 = pa.Table.from_arrays(\n", + " [pa.array(col1_tbl1, pa.int64()), pa.array(col2_tbl1, pa.string())], \n", + " schema=pa.schema([('ID', pa.int64()), ('color', pa.string())])\n", + ")\n", + "\n", + "table2_nrows = 10_000_000 # with more data, we expect GPU to beat CPU\n", + "col1_table2 = np.random.randint(1, 100, size=table2_nrows)\n", + "col2_table2 = np.random.randint(1, 100, size=table2_nrows)\n", + "col3_table2 = np.random.randint(len(col2_tbl1), size=table2_nrows)\n", + "\n", + "table2 = pa.Table.from_arrays(\n", + " [pa.array(col1_table2, pa.int64()), pa.array(col2_table2, pa.int64()), pa.array(col3_table2, pa.int64())], \n", + " schema=pa.schema([(\"price\", pa.int64()), ('Region', pa.int64()), ('color_ID', pa.int64())])\n", + " )\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fragment_count = 8\n", + "hdk_tbl1 = hdk.import_arrow(table1, \"ht1\", int(np.ceil(table1.num_rows/fragment_count)))\n", + "hdk_tbl2 = hdk.import_arrow(table2, \"ht2\", int(np.ceil(table2.num_rows/fragment_count)))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note that cold run may not indicate significant speedups, because HDK\n", + "potentially needs to materialize/build some info about the table and/or the individual columns.\n", + "That info, however, will be preserved and subsequent runs should be faster.\n", + "\n", + "To get \"fair\" results, do not run of optimized versions back-to-back as this will try to reuse results of previous compilations. \n", + "\n", + "You could run the unoptimized version before an optimized one to \n", + "wipe the cached plan and get a time that includes compilation.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Can also refragment original tables\n", + "# hdk_tbl3 = hdk_tbl1.refragmented_view(500_000)\n", + "# hdk_tbl4 = hdk.refragmented_view(\"ht2\", 500_000)\n", + "# OR\n", + "# hdk_tbl4 = hdk.refragmented_view(hdk_tbl2, 500_000)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# To see \"fair\" results, you can at first execute all cells and then click \"execute this cell and below\" \n", + "# Independent ops on CPU (dataframe-like, naive and suboptimal)\n", + "join_start = time.perf_counter()\n", + "join_res = hdk_tbl2.join(hdk_tbl1, lhs_cols=\"color_ID\", rhs_cols=\"ID\").run()\n", + "print(f\"Join time: {(time.perf_counter() - join_start):.3f}s\")\n", + "\n", + "sort_start = time.perf_counter()\n", + "sort_res = join_res.sort(fields={\"price\" : \"desc\"}).run()\n", + "print(f\"Sort time: {(time.perf_counter() - sort_start):.3f}s\")\n", + "\n", + "agg_start = time.perf_counter()\n", + "agg_res = sort_res.agg(\"color_ID\", \"avg(price)\").run()\n", + "print(f\"Agg time: {(time.perf_counter() - agg_start):.3f}s\")\n", + "unopt_query_t = time.perf_counter() - join_start\n", + "print(f\"Total time (unopt): {(unopt_query_t):.3f}s\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Combined plan on CPU \n", + "# Giving the compiler more overview of what we want to achieve allows for more optimizations\n", + "\n", + "q_start = time.perf_counter()\n", + "join_res = hdk_tbl2.join(hdk_tbl1, lhs_cols=\"color_ID\", rhs_cols=\"ID\")\n", + "sort_res = join_res.sort(fields={\"price\" : \"desc\"})\n", + "agg_res = sort_res.agg(\"color_ID\", \"avg(price)\").run()\n", + "opt_query_t = time.perf_counter() - q_start\n", + "print(f\"Total time (Combined plan on CPU): {(opt_query_t):.3f}s\")\n", + "print(f\"Speedup: {(unopt_query_t/opt_query_t):.2f}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Combined plan on GPU\n", + "q_start = time.perf_counter()\n", + "join_res = hdk_tbl2.join(hdk_tbl1, lhs_cols=\"color_ID\", rhs_cols=\"ID\")\n", + "sort_res = join_res.sort(fields={\"price\" : \"desc\"})\n", + "agg_res = join_res.agg(\"color_ID\", \"avg(price)\").run(device_type=\"GPU\")\n", + "opt_query_t = time.perf_counter() - q_start\n", + "print(f\"Total time (Combined plan on GPU): {(opt_query_t):.3f}s\")\n", + "print(f\"Speedup: {(unopt_query_t/opt_query_t):.2f}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Indep ops: Join on CPU, Sort and Agg on GPU\n", + "# Q: Why could it be so much slower than fully on either of the devices? \n", + "# A: GPU must *fetch intermediate results* of the join each run, whereas in\n", + "# the full-GPU mode GPU can retain columns of the table for the next run\n", + "# via BufferManager and thus only needs to transfer the aggregate back to CPU.\n", + "\n", + "join_start = time.perf_counter()\n", + "join_res = hdk_tbl2.join(hdk_tbl1, lhs_cols=\"color_ID\", rhs_cols=\"ID\").run()\n", + "print(f\"Join time: {(time.perf_counter() - join_start):.3f}s\")\n", + "\n", + "sort_res = join_res.sort(fields={\"price\" : \"desc\"})\n", + "agg_start = time.perf_counter()\n", + "agg_res = sort_res.agg(\"color_ID\", \"avg(price)\").run(device_type=\"GPU\")\n", + "print(f\"Sort+Agg time: {(time.perf_counter() - agg_start):.3f}s\")\n", + "opt_query_t = time.perf_counter() - join_start\n", + "print(f\"Speedup (Join on CPU, Agg on GPU): {(unopt_query_t/opt_query_t):.2f}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# SQL example\n", + "q_start = time.perf_counter()\n", + "hdk.sql(\"SELECT color_ID, AVG(price) \\\n", + " FROM ht2 \\\n", + " JOIN ht1 ON ht1.ID = ht2.color_ID \\\n", + " GROUP BY color_ID \\\n", + " ORDER BY AVG(price) DESC\")\n", + "print(f\"SQL time: {(time.perf_counter() - q_start):.3f}s\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "omnisci-dev", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.16" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/basic_python_usage.ipynb b/examples/basic_python_usage.ipynb index 6984e8765..97e4f8f20 100644 --- a/examples/basic_python_usage.ipynb +++ b/examples/basic_python_usage.ipynb @@ -9,16 +9,12 @@ "source": [ "# Initialization\n", "import pyhdk \n", + "import pandas\n", + "import pyarrow as pa\n", "\n", - "# Uses DBID 1\n", - "pyhdk.initLogger()\n", - "config = pyhdk.buildConfig()\n", - "storage = pyhdk.storage.ArrowStorage(1)\n", - "data_mgr = pyhdk.storage.DataMgr(config)\n", - "data_mgr.registerDataProvider(storage)\n", - "\n", - "calcite = pyhdk.sql.Calcite(storage, config)\n", - "executor = pyhdk.Executor(data_mgr, config)" + "hdk = pyhdk.hdk.HDK(\n", + " debug_logs=\"INFO\", # generates log file, DEBUG2 for more verbosity \n", + ") " ] }, { @@ -28,15 +24,8 @@ "metadata": {}, "outputs": [], "source": [ - "# Helper Functions\n", - "def get_rel_alg(sql):\n", - " return calcite.process(sql)\n", - "\n", - "def run_query(sql):\n", - " ra = get_rel_alg(sql)\n", - " # One RelAlgExecutor per query\n", - " rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)\n", - " return rel_alg_executor.execute()" + "tbl = pa.Table.from_pandas(pandas.DataFrame({\"a\": [1, 2, 3], \"b\": [10, 20, 30]}))\n", + "hdk_tbl = hdk.import_arrow(tbl, \"test\")" ] }, { @@ -46,32 +35,14 @@ "metadata": {}, "outputs": [], "source": [ - "## Examples \n", - "\n", - "# Load some data\n", - "import pandas\n", - "import pyarrow as pa\n", - "\n", - "tbl = pa.Table.from_pandas(pandas.DataFrame({\"a\": [1, 2, 3], \"b\": [10, 20, 30]}))\n", - "opt = pyhdk.storage.TableOptions(2)\n", - "storage.importArrowTable(tbl, \"test\", opt)\n", - "\n", "# Basic query\n", - "print(run_query(\"SELECT * FROM test;\").to_arrow().to_pandas())\n", + "print(hdk.sql(\"SELECT * FROM test;\").to_arrow().to_pandas())\n", "\n", - "print(run_query(\"SELECT a, count(*), sum(b) FROM test GROUP BY a;\").to_arrow().to_pandas())\n", + "print(hdk.sql(\"SELECT a, count(*), sum(b) FROM test GROUP BY a;\").to_arrow().to_pandas())\n", "\n", "# Cleanup\n", - "storage.dropTable(\"test\")" + "hdk.drop_table(\"test\")" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "24e70d3f", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { @@ -90,7 +61,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.13" + "version": "3.9.16" } }, "nbformat": 4, diff --git a/examples/heterogen_demo_groupby.ipynb b/examples/heterogen_demo_groupby.ipynb new file mode 100644 index 000000000..4a23cdaac --- /dev/null +++ b/examples/heterogen_demo_groupby.ipynb @@ -0,0 +1,252 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyhdk\n", + "import time\n", + "import pandas as pd\n", + "import pyarrow as pa\n", + "import numpy as np\n", + "import os\n", + "\n", + "hdk = pyhdk.hdk.HDK(\n", + " enable_heterogeneous=True,\n", + " force_heterogeneous_distribution=True,\n", + " enable_multifrag_heterogeneous=True,\n", + " # enable_debug_timer=True,\n", + " # debug_logs=\"INFO\" # generates log file, DEBUG2 for more verbosity \n", + ") " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def synthensizeTable(num_groups_per_col,\n", + " num_rows,\n", + " random_data_col_dt=pa.float64(),\n", + " chunk_size=None):\n", + " \"\"\"\n", + " Generates a table with num_groups_per_col columns of int64 type which have corresponding number of unique elements.\n", + " random_data_col_dt: an additional column (int64/float64) filled with random data for MIN/MAX/AVG/... reductions.\n", + " chunk_size: used to simulate reading a file with arrow which results in a chunked array (affects materialization).\n", + " \"\"\"\n", + " \n", + " if chunk_size is None:\n", + " chunk_size = num_rows\n", + " table_columns = []\n", + " column_data = []\n", + " data_col_dt = pa.int64()\n", + " for groups_count in num_groups_per_col:\n", + " groups = np.random.randint(0, groups_count, num_rows)\n", + " column_name = f\"group_{groups_count}\"\n", + " chunks = [pa.array(groups[i:i+chunk_size], data_col_dt) for i in range(0, len(groups), chunk_size)]\n", + " column = pa.chunked_array(chunks)\n", + " table_columns.append(pa.field(column_name, column.type))\n", + " column_data.append(column)\n", + "\n", + " if pa.types.is_floating(random_data_col_dt):\n", + " random_data = np.random.uniform(0.0, 1000000.0, num_rows)\n", + " else:\n", + " random_data = np.random.randint(0, 1000000, num_rows)\n", + " chunks = [pa.array(random_data[i:i+chunk_size], random_data_col_dt) for i in range(0, len(random_data), chunk_size)]\n", + " random_column = pa.chunked_array(chunks)\n", + "\n", + " table_columns.append(pa.field(\"rand_data\", random_column.type))\n", + " column_data.append(random_column)\n", + "\n", + " table_schema = pa.schema(table_columns)\n", + " groups_tbl = pa.Table.from_arrays(column_data, schema=table_schema)\n", + "\n", + " print(f\"One column has {num_rows/(1000000)} Mil. rows and takes {(num_rows*data_col_dt.bit_width//8)/(1024*1024):.2f} MiB\")\n", + " print(f\"Chunk size: {len(groups_tbl.column(0).chunks[0])}\")\n", + " return groups_tbl\n", + "\n", + "def fragment_size_calc(num_rows):\n", + " \"\"\"Taken from Modin, you can experiment with it.\"\"\"\n", + " cpu_count = os.cpu_count()\n", + " if cpu_count is not None:\n", + " fragment_size = num_rows // cpu_count\n", + " fragment_size = min(fragment_size, 2**25)\n", + " fragment_size = max(fragment_size, 2**18)\n", + " return fragment_size\n", + " else:\n", + " return None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "n_data_cols = 5\n", + "num_groups = [500 * i for i in range(1,n_data_cols+1)]\n", + "# num_groups = [200, 512, 513, 1000, 2000, 5000, 10000] \n", + "tbl = synthensizeTable(num_groups_per_col=num_groups, \n", + " num_rows=20_000_000, \n", + " # random_data_col_dt=pa.int64(), \n", + " chunk_size=50000)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "default_fragment_size = fragment_size_calc(tbl.num_rows)\n", + "table_name = \"groups_table\"\n", + "hdk_tbl = hdk.import_arrow(tbl, table_name, default_fragment_size)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "gpu_proportion = 100\n", + "q_opts = {\"forced_gpu_proportion\":gpu_proportion}\n", + "q = f\"SELECT SUM(x) FROM (SELECT COUNT(*) x FROM {table_name} \"\n", + "print(f\"Running query {q}\")\n", + "for group in num_groups:\n", + " query_start = time.perf_counter()\n", + " res = hdk.sql(q + f\"GROUP BY group_{group});\", q_opts)\n", + " print(f\"Query for group_{group} took {(time.perf_counter() - query_start):.3f}s\")\n", + "\n", + "q = f\"SELECT MIN({tbl.column_names[-1]}), MAX({tbl.column_names[-1]}) FROM groups_table \"\n", + "print(f\"Running query {q}\")\n", + "for group in num_groups:\n", + " query_start = time.perf_counter()\n", + " res = hdk.sql(q + f\"GROUP BY group_{group};\", q_opts)\n", + " assert(res.to_arrow().num_rows == group)\n", + " print(f\"Query for group_{group} took {(time.perf_counter() - query_start):.3f}s\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Let's see if heterogeneous execution wins anywhere in simple aggregations?\n", + "# For group_count < 512 CPU will always win.\n", + "\n", + "num_groups = [1000 * 2*i for i in range(1,15+1)]\n", + "num_groups.append(1500)\n", + "num_groups = sorted(num_groups)\n", + "tbl = synthensizeTable(num_groups_per_col=num_groups, \n", + " num_rows=300_000_000)\n", + "\n", + "default_fragment_size = fragment_size_calc(tbl.num_rows)\n", + "frag_size = default_fragment_size // 2\n", + "table_name = \"table_for_grid\"\n", + "hdk.drop_table(table_name)\n", + "hdk_het_tbl = hdk.import_arrow(tbl, table_name, frag_size)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "prop_time = dict()\n", + "n_iters_check = 10\n", + "proportion_range = range(0,101,10)\n", + "q_per_group_size_df = pd.DataFrame()\n", + "\n", + "for g in num_groups:\n", + " for prop in proportion_range:\n", + " prop_time[prop] = []\n", + " for it in range(1+n_iters_check):\n", + " for prop in proportion_range:\n", + " agg_res1 = hdk_het_tbl.agg(f\"group_{g}\", \"avg(rand_data)\")\n", + " # agg_res1 = agg_res1.agg(\"rand_data_avg\", aggs ={\"min\":\"min(rand_data_avg)\", \"max\":\"max(rand_data_avg)\"})\n", + " agg_start = time.perf_counter()\n", + " agg_res1.run(forced_gpu_proportion=prop)\n", + " if it:\n", + " q_time = int((time.perf_counter() - agg_start)*1000)\n", + " prop_time[prop].append(q_time) \n", + " if q_per_group_size_df.empty:\n", + " q_per_group_size_df = pd.DataFrame({k: np.median(v) for k, v in prop_time.items()}, index=[0]).T\n", + " q_per_group_size_df.rename(columns={0: f\"group_{g}\"}, inplace=True)\n", + " else:\n", + " q_per_group_size_df[f\"group_{g}\"] = [np.median(v) for v in prop_time.values()]\n", + " hdk.clear_gpu_mem()\n", + " print(f\"Running group_{g}\")\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import importlib.util\n", + "if importlib.util.find_spec(\"matplotlib\") is None:\n", + " raise Exception(\"Please install matplotlib\")\n", + "\n", + "import matplotlib.pyplot as plt\n", + "from mpl_toolkits.mplot3d import Axes3D\n", + "\n", + "plt.figure(figsize=(13, 8), dpi=200)\n", + "plt.imshow(q_per_group_size_df, cmap='summer', aspect='auto')\n", + "plt.colorbar(label='Time(ms)')\n", + "plt.ylabel(\"GPU proportion\")\n", + "plt.xlabel(\"Number of groups per column\")\n", + "\n", + "for i in range(len(q_per_group_size_df.columns)):\n", + " for j in range(len(q_per_group_size_df.index)):\n", + " cell_color = 'black'\n", + " if q_per_group_size_df.index[j] == q_per_group_size_df.iloc[:, i].idxmin():\n", + " cell_color = 'red'\n", + " plt.text(i, j, f'{q_per_group_size_df.iloc[j, i]:.1f}', ha='center', va='center', color=cell_color)\n", + "\n", + "\n", + "plt.xticks(np.arange(len(q_per_group_size_df.columns)), q_per_group_size_df.columns, rotation=330)\n", + "plt.yticks(np.arange(len(q_per_group_size_df.index)), q_per_group_size_df.index)\n", + "plt.title(f\"Heterogeneous Aggregation, #Rows={tbl.num_rows//1_000_000}Mil., #Frags={int(np.ceil(tbl.num_rows/frag_size))}\")\n", + "plt.show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "hdk.drop_table(table_name)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "omnisci-dev", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.16" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/heterogen_demo_taxi.ipynb b/examples/heterogen_demo_taxi.ipynb new file mode 100644 index 000000000..7b4b2f381 --- /dev/null +++ b/examples/heterogen_demo_taxi.ipynb @@ -0,0 +1,347 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialization\n", + "import pyhdk\n", + "import time\n", + "import pandas as pd\n", + "import pyarrow as pa\n", + "import pyarrow.parquet\n", + "import pyarrow.csv\n", + "import numpy as np\n", + "import os\n", + "\n", + "hdk = pyhdk.hdk.HDK(\n", + " enable_heterogeneous=True,\n", + " force_heterogeneous_distribution=True,\n", + " enable_multifrag_heterogeneous=True,\n", + " # enable_debug_timer=True,\n", + " # debug_logs=\"INFO\" # generates log file, DEBUG2 for more verbosity \n", + ") " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Helper Functions\n", + "def import_hdk_pyarrow(arrow_tbl, hdk_tbl_name, fragment_size, overwrite=True):\n", + " \"\"\"\n", + " Wrapper that imports a pyarrow table to HDK with the given fragment size.\n", + " overwrite: By default overwrites previously existing table.\n", + " \"\"\"\n", + " if overwrite:\n", + " hdk.drop_table(hdk_tbl_name)\n", + " start_timer = time.perf_counter()\n", + " hdk_tbl = hdk.import_arrow(arrow_tbl, hdk_tbl_name, fragment_size)\n", + " print(f\"[PyHDK] Importing pyarrow table: {(time.perf_counter()-start_timer):.4f}s\")\n", + " return hdk_tbl\n", + "\n", + "\n", + "def fragment_size_calc(num_rows):\n", + " \"\"\"Taken from Modin, you can experiment with it.\"\"\"\n", + " cpu_count = os.cpu_count()\n", + " if cpu_count is not None:\n", + " fragment_size = num_rows // cpu_count\n", + " fragment_size = min(fragment_size, 2**25)\n", + " fragment_size = max(fragment_size, 2**18)\n", + " return fragment_size\n", + " else:\n", + " return None\n", + "\n", + "def fragment_size_test_range(num_rows):\n", + " \"\"\"\n", + " Take two power of two steps around default frag_size: [x/4,x/2,x,x*2,x*4].\n", + " \"\"\"\n", + " res_range = []\n", + " default_fragment_size = fragment_size_calc(num_rows)\n", + " print(f\"Default fragment_size={default_fragment_size}\")\n", + " power_two_steps = 2\n", + " range_start = default_fragment_size//(2**power_two_steps)\n", + " range_end = default_fragment_size*(2**power_two_steps)\n", + " fragment_size = range_start\n", + " while fragment_size < range_end+1:\n", + " res_range.append(fragment_size)\n", + " fragment_size *= 2\n", + " return res_range\n", + "\n", + "\n", + "def run_single_q_all_props(sql, q_name, prop_step, n_iters, clear_gpu_mem=False):\n", + " \"\"\"\n", + " Runs SQL query multiple times at each proportion, feel free try and experiment with loops order.\n", + " clear_gpu_mem: when True, clear GPU memory between runs\n", + " \"\"\"\n", + " col_names = [\"GPU_prop\", q_name]\n", + " prop_time = {col_names[0] : [], col_names[1]: []}\n", + " # Walking over proportions\n", + " for gpu_proportion in range(0, 101, prop_step):\n", + " # Multiple iterations\n", + " for _ in range(1, n_iters + 1):\n", + " query_start = time.perf_counter()\n", + " result = hdk.sql(sql, {\"forced_gpu_proportion\":gpu_proportion})\n", + " query_finish = time.perf_counter()\n", + " prop_time[col_names[0]].append(gpu_proportion)\n", + " prop_time[col_names[1]].append(query_finish - query_start)\n", + " if clear_gpu_mem:\n", + " hdk.clear_gpu_mem()\n", + " df_output = result.to_arrow().to_pandas()\n", + " df_prop_time = pd.DataFrame(prop_time, columns=col_names)\n", + " return [df_prop_time, df_output]\n", + "\n", + "def run_queries_all_props(query_dict, step, n_iters, clear_gpu_mem=False):\n", + " \"\"\"\n", + " Runs query dictionary of SQL queries with the following structure: dict(query_name:{SQL_string})\n", + " clear_gpu_mem: when True, clear GPU memory between runs\n", + " \"\"\"\n", + " q_timings_df = pd.DataFrame()\n", + " # new_df = old_df[['a', 'b', 'c', 'd']]\n", + " for q_name in query_dict:\n", + " [df_prop_time, df_output] = run_single_q_all_props(\n", + " query_dict[q_name], \n", + " q_name=q_name, \n", + " prop_step=step, \n", + " n_iters=n_iters, \n", + " clear_gpu_mem=clear_gpu_mem\n", + " )\n", + " if q_timings_df.empty:\n", + " q_timings_df = df_prop_time\n", + " q_timings_df.rename(columns={q_name:f\"{q_name}_#RowsOut={df_output.shape[0]}\"}, inplace=True)\n", + " else:\n", + " q_timings_df[f\"{q_name}_#RowsOut={df_output.shape[0]}\"] = df_prop_time[q_name]\n", + " return q_timings_df\n", + "\n", + "def test_groups_fragment_sizes(\n", + " pyarrow_tbl, \n", + " table_name,\n", + " get_queries_for_table_callback, \n", + " step, \n", + " n_iters, \n", + " clear_memory_devices=False\n", + " ):\n", + " \"\"\" \n", + " Runs queries for different fragment sizes and returns a dictionary of structure: `frag_size: timings_df`\n", + " \"\"\"\n", + " \n", + " q_per_frag_size_df = pd.DataFrame()\n", + " for frag_size in fragment_size_test_range(pyarrow_tbl.num_rows):\n", + " table_rows = pyarrow_tbl.num_rows\n", + " print(f\"Testing {table_rows} rows table with Frag.size={frag_size}\")\n", + " refragmented_view_name = f\"{table_name}_{frag_size}\"\n", + " hdk.refragmented_view(table_name, frag_size, refragmented_view_name)\n", + " queries_timings = run_queries_all_props(\n", + " get_queries_for_table_callback(refragmented_view_name), \n", + " step, \n", + " n_iters, \n", + " clear_memory_devices\n", + " )\n", + " queries_timings[\"Frag.size\"]=frag_size\n", + " if q_per_frag_size_df.empty:\n", + " q_per_frag_size_df = queries_timings\n", + " else:\n", + " q_per_frag_size_df = pd.concat([q_per_frag_size_df, queries_timings])\n", + " hdk.drop_table(refragmented_view_name)\n", + " hdk.clear_gpu_mem()\n", + " return q_per_frag_size_df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read data (replace with real dataset)\n", + "dataset_path = \"../omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header.csv\"\n", + "table_name = \"taxi\"\n", + "# If the CSV does not have a header, please provide the column names.\n", + "pyarrow_tbl = pa.csv.read_csv(dataset_path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Queries (NY Taxi example)\n", + "def getTaxiQ_for_table(tbl_name):\n", + " return {\n", + " \"Q1\": f\"SELECT cab_type, count(*)\\\n", + " FROM {tbl_name}\\\n", + " GROUP BY cab_type;\",\n", + " \"Q2\": f\"SELECT passenger_count, avg(total_amount)\\\n", + " FROM {tbl_name}\\\n", + " GROUP BY passenger_count;\",\n", + " \"Q3\": f\"SELECT passenger_count, extract(year from pickup_datetime) as pickup_year, count(*)\\\n", + " FROM {tbl_name}\\\n", + " GROUP BY passenger_count, extract(year from pickup_datetime);\",\n", + " \"Q4\": f\"SELECT passenger_count,\\\n", + " extract(year from pickup_datetime) as pickup_year,\\\n", + " cast(trip_distance as int) AS distance,\\\n", + " count(*) AS the_count\\\n", + " FROM {tbl_name}\\\n", + " GROUP BY passenger_count,\\\n", + " pickup_year,\\\n", + " distance\\\n", + " ORDER BY passenger_count, pickup_year, distance, the_count;\"\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "hdk_tbl = import_hdk_pyarrow(pyarrow_tbl, table_name, fragment_size_calc(pyarrow_tbl.num_rows))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "prop_step = 25\n", + "n_iters_per_prop = 3" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "default_timings_df = run_queries_all_props(\n", + " getTaxiQ_for_table(table_name),\n", + " prop_step,\n", + " n_iters_per_prop\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "timing_per_frag_df = test_groups_fragment_sizes(\n", + " pyarrow_tbl,\n", + " table_name,\n", + " getTaxiQ_for_table,\n", + " prop_step,\n", + " n_iters_per_prop\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import importlib.util\n", + "if importlib.util.find_spec(\"matplotlib\") is None:\n", + " raise Exception(\"Please install matplotlib\")\n", + "\n", + "import matplotlib.pyplot as plt\n", + "%matplotlib inline\n", + "plt.rcParams[\"figure.figsize\"] = (8,4)\n", + "styles = ['s-','o-','^-','+-','*-',',-']\n", + "\n", + "def plotTimings(dict_of_df_timings, plot_name=\"Time vs GPU proportion\"):\n", + " ylab = \"Time (s)\"\n", + " xlab = \"Data proportion on GPU (%)\"\n", + " df_agg = dict_of_df_timings.groupby([\"GPU_prop\"]).median()\n", + " df_agg.plot(xlabel=xlab, ylabel=ylab, title=plot_name)\n", + " plt.legend(bbox_to_anchor=(1.01, 1), loc=\"upper left\")\n", + " plt.tight_layout()\n", + "\n", + "def plotTimingsFrags(timing_per_frag_df, default_frag_size = None, row_count = None, plot_name=\"Time vs GPU proportion\"):\n", + " ylab = \"Time (ms)\"\n", + " xlab = \"Data proportion on GPU (%)\"\n", + " fig, axes = plt.subplots(timing_per_frag_df.shape[1]-2,1)\n", + " fig.set_size_inches(7,9)\n", + " frag_sizes = timing_per_frag_df[\"Frag.size\"].unique()\n", + " for frag_size in frag_sizes:\n", + " frag_df = timing_per_frag_df[timing_per_frag_df[\"Frag.size\"]==frag_size].groupby([\"GPU_prop\"]).median()\n", + " frag_df = frag_df.drop(\"Frag.size\", axis=1)\n", + " frag_df *= 1000 \n", + " for enum, q_name in enumerate(frag_df):\n", + " df_agg = frag_df[q_name]\n", + " subplot_title = q_name\n", + " lab = f\"Frag.size={frag_size}\" if row_count is None else f\"Num.frags={int(np.ceil(row_count/frag_size))}\"\n", + " if default_frag_size is not None and default_frag_size == frag_size:\n", + " lab = f\"{lab} (CPU opt)\"\n", + " if frag_size == np.max(frag_sizes):\n", + " lab = f\"{lab} (GPU opt.)\"\n", + " df_agg.plot(\n", + " ax=axes[enum], \n", + " xlabel=xlab, \n", + " ylabel=ylab, \n", + " title=subplot_title, \n", + " style=styles[enum], \n", + " label=lab\n", + " )\n", + " axes[enum].legend(bbox_to_anchor=(1.01, 1.02), loc=\"upper left\")\n", + " fig.suptitle(plot_name)\n", + " fig.tight_layout()\n", + " fig.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "default_frag_size = fragment_size_calc(pyarrow_tbl.num_rows)\n", + "plotTimingsFrags(\n", + " timing_per_frag_df, \n", + " default_frag_size=default_frag_size, \n", + " row_count=pyarrow_tbl.num_rows,\n", + " plot_name=f\"Taxi, #Rows={pyarrow_tbl.num_rows//(1000*1000)}Mil.\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# HDK Cleanup\n", + "hdk.dropTable(table_name)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "omnisci-dev", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.16" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/heterogeneous_demo.ipynb b/examples/heterogeneous_demo.ipynb deleted file mode 100644 index 39bdd42e6..000000000 --- a/examples/heterogeneous_demo.ipynb +++ /dev/null @@ -1,351 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Initialization\n", - "import pyhdk \n", - "import pandas\n", - "import time\n", - "import pyarrow as pa\n", - "import pyarrow.csv\n", - "import os, sys\n", - "\n", - "config = pyhdk.buildConfig(enable_heterogeneous=True,\n", - " force_heterogeneous_distribution=True,\n", - " enable_multifrag_heterogeneous=True,\n", - " enable_debug_timer=False,\n", - " )\n", - "# pyhdk.initLogger(log_severity=\"INFO\")\n", - "storage = pyhdk.storage.ArrowStorage(1)\n", - "data_mgr = pyhdk.storage.DataMgr(config)\n", - "data_mgr.registerDataProvider(storage)\n", - "\n", - "calcite = pyhdk.sql.Calcite(storage, config)\n", - "executor = pyhdk.Executor(data_mgr, config)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Helper Functions\n", - "default_step = 50\n", - "default_iters = 3\n", - "\n", - "def get_rel_alg(sql):\n", - " return calcite.process(sql)\n", - "\n", - "def run_query(sql):\n", - " ra = get_rel_alg(sql)\n", - " # One RelAlgExecutor per query\n", - " rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)\n", - " return rel_alg_executor.execute()\n", - "\n", - "\n", - "def import_hdk_pyarrow(storage, arrow_table, hdk_table_name, fragment_size, overwrite=True):\n", - " \"\"\"\n", - " Imports a pyarrow table to HDK with the given fragment size.\n", - " overwrite: By default overwrites previously existing table.\n", - " \"\"\"\n", - " opt = pyhdk.storage.TableOptions(fragment_size)\n", - " start_timer = time.perf_counter()\n", - " try:\n", - " storage.importArrowTable(arrow_table, hdk_table_name, opt)\n", - " except:\n", - " if not overwrite:\n", - " raise Exception(f\"Cannot overwrite table{hdk_table_name}, overwrite={overwrite}\")\n", - " storage.dropTable(hdk_table_name)\n", - " storage.importArrowTable(arrow_table, hdk_table_name, opt)\n", - " print(f\"[PyHDK] Importing pyarrow table: {(time.perf_counter()-start_timer):.4f}s\")\n", - "\n", - "\n", - "def run_query_het_all_props(sql, query_name=\"\", prop_step=default_step, n_iters=default_iters, clear_memory_devices=[]):\n", - " \"\"\"\n", - " Runs SQL query multiple times at each proportion, feel free try and experiment with loops order.\n", - " clear_memory_devices: clear memory of the device manager: 1:CPU, 2:GPU \n", - " \"\"\"\n", - " cython_enum_dict = {\"CPU\":1, \"GPU\":2} # May move up to cython for easier interface\n", - " ra = get_rel_alg(sql)\n", - " col_names = [\"GPU_prop\", f\"QueryT_{query_name}\"]\n", - " prop_time = {col_names[0] : [], col_names[1]: []}\n", - " # Walking over proportions\n", - " for gpu_proportion in range(0, 101, prop_step):\n", - " # Multiple iterations\n", - " for _ in range(1, n_iters + 1):\n", - " rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)\n", - " query_start = time.perf_counter()\n", - " result = rel_alg_executor.execute(forced_gpu_proportion=gpu_proportion)\n", - " query_finish = time.perf_counter()\n", - " prop_time[col_names[0]].append(gpu_proportion)\n", - " prop_time[col_names[1]].append(query_finish - query_start)\n", - " [executor.clearMemory(data_mgr, cython_enum_dict[device]) for device in clear_memory_devices]\n", - "\n", - " df_prop_time = pandas.DataFrame(prop_time, columns=col_names)\n", - " # Some metadata to get idea about the output cardinality\n", - " df_output = result.to_arrow().to_pandas()\n", - " output_size_KB = df_output.memory_usage(index=True).sum() // (1024)\n", - " df_prop_time.rename(columns={col_names[1]:f\"{col_names[1]}_{output_size_KB}KB\"}, inplace=True)\n", - " return [df_prop_time, df_output]\n", - "\n", - "def run_queries_all_props(query_dict, step=default_step, n_iters=default_iters, clear_memory_devices=[]):\n", - " \"\"\"\n", - " Runs query dictionary of SQL queries with the following structure: dict(query_name:{SQL_string})\n", - " clear_memory_devices: clear memory of the device manager after each query: \"CPU\", \"GPU\" \n", - " \"\"\"\n", - " q_timings_dict = dict()\n", - " for q_name in query_dict:\n", - " [df_prop_time, df_output] = run_query_het_all_props(query_dict[q_name], \n", - " query_name=q_name, \n", - " prop_step=step, \n", - " n_iters=n_iters, \n", - " clear_memory_devices=clear_memory_devices)\n", - " df_prop_time.set_index(\"GPU_prop\", inplace=True)\n", - " q_timings_dict[q_name] = (df_prop_time)\n", - " return q_timings_dict\n", - "\n", - "def fragment_size_calc(num_rows):\n", - " \"\"\"Taken from Modin, you can experiment with it.\"\"\"\n", - " cpu_count = os.cpu_count()\n", - " if cpu_count is not None:\n", - " fragment_size = num_rows // cpu_count\n", - " fragment_size = min(fragment_size, 2**25)\n", - " fragment_size = max(fragment_size, 2**18)\n", - " return fragment_size\n", - " else:\n", - " return None\n", - "\n", - "def fragment_size_test_range(num_rows):\n", - " \"\"\"\n", - " Take two power of two steps around default frag_size: [x/4,x/2,x,x*2,x*4].\n", - " \"\"\"\n", - " res_range = []\n", - " default_fragment_size = fragment_size_calc(num_rows)\n", - " print(f\"Default fragment_size={default_fragment_size}\")\n", - " power_two_steps = 2\n", - " range_start = default_fragment_size//(2**power_two_steps)\n", - " range_end = default_fragment_size*(2**power_two_steps)\n", - " fragment_size = range_start\n", - " while fragment_size < range_end+1:\n", - " res_range.append(fragment_size)\n", - " fragment_size *= 2\n", - " return res_range\n", - "\n", - "def test_groups_fragment_sizes(storage, pyarrow_tbl, table_name, get_q_dict_callback, step, n_iters, clear_memory_devices=[]):\n", - " \"\"\" \n", - " Produces the follwing result grouping: fragment_size{query_name{timings_df}}\n", - " \"\"\"\n", - " part_group_timings_dict = dict()\n", - " for frag_size in fragment_size_test_range(pyarrow_tbl.num_rows):\n", - " table_size_MB = pyarrow_tbl.nbytes // (1024*1024)\n", - " print(f\"Testing {table_size_MB}MB Table with Frag.size={frag_size}\")\n", - " refragmented_view_name = f\"{table_name}_{frag_size}\"\n", - " storage.createRefragmentedView(table_name, refragmented_view_name, frag_size)\n", - " part_group_timings_dict[f\"Tbl_size_{table_size_MB}MB_frag_size_{frag_size}\"] = run_queries_all_props(get_q_dict_callback(refragmented_view_name), step, n_iters, clear_memory_devices)\n", - " storage.dropTable(refragmented_view_name)\n", - " return part_group_timings_dict" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Read data\n", - "dataset_path = \"../omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header.csv\"\n", - "table_name = \"taxi\"\n", - "# If the CSV does not have a header, please provide the column names.\n", - "pyarrow_tbl = pa.csv.read_csv(dataset_path)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Queries (NY Taxi example)\n", - "select_queries = {\n", - " \"simple_select\" : f\"SELECT * FROM {table_name};\",\n", - " \"select_less\" : f\"SELECT * FROM {table_name} WHERE rate_code_id > 1;\",\n", - "}\n", - "\n", - "groupby_queries = {\n", - " \"simple_groupby\" : f\"SELECT Count(*) FROM {table_name} GROUP BY rate_code_id;\",\n", - " \"group_by_larger\" : f\"SELECT total_amount, COUNT(*) FROM {table_name} GROUP BY total_amount;\",\n", - "}\n", - "\n", - "def getTaxiQ_for_table(tbl_name):\n", - " return {\n", - " \"Q1\": f\"SELECT cab_type, count(*)\\\n", - " FROM {tbl_name}\\\n", - " GROUP BY cab_type;\",\n", - " \"Q2\": f\"SELECT passenger_count, avg(total_amount)\\\n", - " FROM {tbl_name}\\\n", - " GROUP BY passenger_count;\",\n", - " \"Q3\": f\"SELECT passenger_count, extract(year from pickup_datetime) as pickup_year, count(*)\\\n", - " FROM {tbl_name}\\\n", - " GROUP BY passenger_count, extract(year from pickup_datetime);\",\n", - " \"Q4\": f\"SELECT passenger_count,\\\n", - " extract(year from pickup_datetime) as pickup_year,\\\n", - " cast(trip_distance as int) AS distance,\\\n", - " count(*) AS the_count\\\n", - " FROM {tbl_name}\\\n", - " GROUP BY passenger_count,\\\n", - " pickup_year,\\\n", - " distance\\\n", - " ORDER BY passenger_count, pickup_year, distance, the_count;\"\n", - "}" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# # Run Queries (kernel may crush, HDK-side issues)\n", - "default_fragment_size = fragment_size_calc(pyarrow_tbl.num_rows)\n", - "import_hdk_pyarrow(storage, pyarrow_tbl, table_name, default_fragment_size)\n", - "\n", - "# select_queries_timings = run_queries_all_props(select_queries,20,4) # on large tables can take quite some time on GPU\n", - "groupby_queries_timings = run_queries_all_props(groupby_queries,10,10,clear_memory_devices=[\"CPU\", \"GPU\"])\n", - "taxi_queries_timings = run_queries_all_props(getTaxiQ_for_table(table_name),10,3)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "taxi_frags = test_groups_fragment_sizes(storage, pyarrow_tbl, table_name, getTaxiQ_for_table, 10, 4)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import importlib.util\n", - "if importlib.util.find_spec(\"matplotlib\") is None:\n", - " raise Exception(\"Please install matplotlib\")\n", - "\n", - "import matplotlib.pyplot as plt\n", - "%matplotlib inline\n", - "plt.rcParams[\"figure.figsize\"] = (8,4)\n", - "styles = ['s-','o-','^-','+-','*-',',-']\n", - "\n", - "def plotTimings(dict_of_df_timings, plot_name=\"Time vs GPU proportion\"):\n", - " ylab = \"Time (s)\"\n", - " xlab = \"Data proportion on GPU (%)\"\n", - " df_accumulator = None\n", - " for q_name in dict_of_df_timings:\n", - " df_agg = dict_of_df_timings[q_name].groupby([\"GPU_prop\"]).median()\n", - " if df_accumulator is None:\n", - " df_accumulator = df_agg\n", - " else:\n", - " df_accumulator = df_accumulator.merge(df_agg, on=[\"GPU_prop\"])\n", - " df_accumulator.plot(xlabel=xlab, ylabel=ylab, title=plot_name)\n", - " plt.legend(bbox_to_anchor=(1.01, 1), loc=\"upper left\")\n", - " plt.tight_layout()\n", - "\n", - "\n", - "def swap_dict_levels(dict_frag_to_q):\n", - " \"\"\" \n", - " Normally, we have the following grouping: \n", - " query_name{timings_df}, \n", - " for fragments we have:\n", - " fragment_size{query_name{timings_df}},\n", - " so to make plotting taxi queries simpler, we change it to:\n", - " query_name{fragment_size{timings_df}}, \n", - " feel free to change the query grouping structure.\n", - " \"\"\"\n", - " transformed_q_frag = dict()\n", - " for frag_size, q_values in dict_frag_to_q.items():\n", - " for q, value in q_values.items():\n", - " if q not in transformed_q_frag:\n", - " transformed_q_frag[q] = value\n", - " else:\n", - " transformed_q_frag[q] = pandas.concat([transformed_q_frag[q], value], axis=1)\n", - " l = transformed_q_frag[q].columns.tolist()\n", - " transformed_q_frag[q] = transformed_q_frag[q].rename(columns={l[-1] :l[-1]+ f\"_{frag_size}\"})\n", - " return transformed_q_frag\n", - "\n", - "def plotTimingsFrags(dict_of_df_timings, plot_name=\"Time vs GPU proportion\"):\n", - " ylab = \"Time (s)\"\n", - " xlab = \"Data proportion on GPU (%)\"\n", - " fig, axes = plt.subplots(len(dict_of_df_timings),1)\n", - " fig.set_size_inches(7,9)\n", - " for enum, q_name in enumerate(dict_of_df_timings):\n", - " df_agg = dict_of_df_timings[q_name].groupby([\"GPU_prop\"]).median()\n", - " # Cut redundand query info\n", - " df_agg.rename(columns=lambda x: '_'.join(x.split('_')[-3:]), inplace=True)\n", - " subplot_title = q_name\n", - " df_agg.plot(ax=axes[enum], xlabel=xlab, ylabel=ylab, title=subplot_title, style=styles[enum])\n", - " axes[enum].legend(bbox_to_anchor=(1.01, 1.02), loc=\"upper left\")\n", - "\n", - " fig.tight_layout()\n", - " fig.show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plotTimings(groupby_queries_timings)\n", - "plotTimings(taxi_queries_timings, \"Taxi queries\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plotTimingsFrags(swap_dict_levels(taxi_frags))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# HDK Cleanup\n", - "storage.dropTable(table_name)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "omnisci-dev", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.16" - }, - "orig_nbformat": 4 - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/python/pyhdk/_builder.pyx b/python/pyhdk/_builder.pyx index 9d649b928..9c21bc1d9 100644 --- a/python/pyhdk/_builder.pyx +++ b/python/pyhdk/_builder.pyx @@ -470,10 +470,10 @@ cdef class QueryNode: def __getitem__(self, col): return self.ref(col) - def refragmented_view(self, fragment_size, new_table_name=None): + def refragmented_view(self, fragment_size, refragmented_view_name=None): if not self.is_scan: raise TypeError("Only table scan QueryNode can be refragmented") - res_name, exists = self._hdk._process_import_table_name(new_table_name) + res_name, exists = self._hdk._process_import_table_name(refragmented_view_name) if exists: raise RuntimeError("New table name already exists") self._hdk._storage.createRefragmentedView(self.table_name, res_name, fragment_size) diff --git a/python/pyhdk/_common.pyx b/python/pyhdk/_common.pyx index 57628514a..1285805d5 100644 --- a/python/pyhdk/_common.pyx +++ b/python/pyhdk/_common.pyx @@ -199,14 +199,16 @@ def buildConfig(*, log_dir="hdk_log", **kwargs): config.c_config.get().debug.log_dir = log_dir return config -def initLogger(*, debug_logs=False, log_dir="hdk_log", **kwargs): +def initLogger(*, debug_logs=None, log_dir="hdk_log", **kwargs): argv0 = "PyHDK".encode('UTF-8') cdef char *cargv0 = argv0 cdef string default_log_dir = log_dir cdef unique_ptr[CLogOptions] opts = make_unique[CLogOptions](cargv0, default_log_dir) cmd_str = "".join(' --%s %r' % arg for arg in kwargs.iteritems()) cmd_str = cmd_str.replace("_", "-") + if isinstance(debug_logs, str): + cmd_str += ("--log-severity="+debug_logs) opts.get().parse_command_line(argv0, cmd_str) - if debug_logs: + if not isinstance(debug_logs, str) and debug_logs: opts.get().severity_ = CSeverity.DEBUG3 CInitLogger(dereference(opts)) diff --git a/python/pyhdk/hdk.py b/python/pyhdk/hdk.py index 005f6c386..f91427ca3 100644 --- a/python/pyhdk/hdk.py +++ b/python/pyhdk/hdk.py @@ -1585,6 +1585,24 @@ def proj(self, *args, exprs=None, **kwargs): """ pass + def refragmented_view(self, fragment_size, refragmented_view_name=None): + """ + Creates a refragmented view of an existing table. + Parameters + ---------- + fragment_size : int + Specifies new fragment size of a table + refragmented_view_name: str + Can be used to give a custom name for a view + + Returns + ------- + QueryNode + A scan query node referencing refragmented view. + + """ + pass + def agg(self, group_keys, *args, aggs=None, **kwargs): """ Create an aggregation node with the current node as its input. @@ -1592,7 +1610,7 @@ def agg(self, group_keys, *args, aggs=None, **kwargs): Parameters ---------- group_keys : int, str, QueryExpr or iterable - Group key used fro aggregation. Integer and string values can be used + Group key used for aggregation. Integer and string values can be used to reference input columns by its index or name. QueryExpr expressions can be used to simply reference input columns or build more complex group keys. @@ -2388,7 +2406,7 @@ def _process_import_table_name(self, table_name): elif isinstance(table_name, str): exists = self._storage.tableInfo(table_name) is not None elif table_name is None: - res_name = "tabe_" + uuid.uuid4().hex + res_name = "table_" + uuid.uuid4().hex else: raise TypeError( f"Expected str or QueryNode for 'table_name' arg. Got: {type(table_name)}." @@ -2500,6 +2518,41 @@ def sql(self, sql_query, query_opts=None, **kwargs): res.scan = self.scan(res.table_name) return res + def clear_gpu_mem(self): + """ + Clears GPU memory of all previously transferred buffers. + """ + self._executor.clearMemory(self._data_mgr, 2) + + def refragmented_view(self, table, fragment_size, refragmented_view_name=None): + """ + Creates a refragmented table view of an existing non-view table. + Parameters + ---------- + table: str or + Name of the table to refragment + fragment_size : int + Specifies new fragment size of a table + refragmented_view_name: str, default: None + Can be used to give a custom name for a view + + Returns + ------- + QueryNode + A scan query node referencing refragmented view. + + """ + + if isinstance(table, QueryNode) and table.is_scan: + return table.refragmented_view(fragment_size, refragmented_view_name) + if isinstance(table, str): + return self.scan(table).refragmented_view( + fragment_size, refragmented_view_name + ) + raise TypeError( + f"Expected str or table scan QueryNode for a table name alias. Got: {type(table)}." + ) + def scan(self, table_name): """ Create a scan query node referencing specified table. diff --git a/python/tests/test_pyhdk_api.py b/python/tests/test_pyhdk_api.py index b9ebebc9e..f3de1cc4c 100644 --- a/python/tests/test_pyhdk_api.py +++ b/python/tests/test_pyhdk_api.py @@ -175,7 +175,7 @@ def test_refragment(self, exe_cfg): ) # Refragment a table - res = trips.refragmented_view(2, new_table_name="trips_2") + res = trips.refragmented_view(2, refragmented_view_name="trips_2") res1 = trips.refragmented_view(3) assert res.is_scan