From 6ca13cce6b7887c6b88f2124728981523ba68733 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 30 Dec 2024 10:17:38 +0200 Subject: [PATCH 1/7] added optimize delta tables to support all delta tables --- src/sempy_labs/__init__.py | 2 + src/sempy_labs/_helper_functions.py | 71 +++++++++++++++-------- src/sempy_labs/_optimize.py | 80 ++++++++++++++++++++++++++ src/sempy_labs/lakehouse/_lakehouse.py | 32 ++--------- 4 files changed, 133 insertions(+), 52 deletions(-) create mode 100644 src/sempy_labs/_optimize.py diff --git a/src/sempy_labs/__init__.py b/src/sempy_labs/__init__.py index da57e489..0e8f9307 100644 --- a/src/sempy_labs/__init__.py +++ b/src/sempy_labs/__init__.py @@ -1,4 +1,5 @@ from sempy_labs._job_scheduler import list_item_job_instances +from sempy_labs._optimize import optimize_delta_tables from sempy_labs._gateways import ( list_gateway_members, list_gateway_role_assigments, @@ -470,4 +471,5 @@ "bind_semantic_model_to_gateway", "list_semantic_model_errors", "list_item_job_instances", + "optimize_delta_tables", ] diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 64b33f45..f61fa52e 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -172,6 +172,31 @@ def resolve_item_name_and_id( return item_name, item_id +def resolve_lakehouse_name_and_id( + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None +) -> Tuple[str, UUID]: + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + + if lakehouse is None: + lakehouse_id = fabric.get_lakehouse_id() + lakehouse_name = fabric.resolve_item_name( + item_id=lakehouse_id, type="Lakehouse", workspace=workspace_id + ) + elif _is_valid_uuid(lakehouse): + lakehouse_id = lakehouse + lakehouse_name = fabric.resolve_item_name( + item_id=lakehouse_id, type="Lakehouse", workspace=workspace_id + ) + else: + lakehouse_name = lakehouse + lakehouse_id = fabric.resolve_item_id( + item_name=lakehouse, type="Lakehouse", workspace=workspace_id + ) + + return lakehouse_name, lakehouse_id + + def resolve_dataset_name_and_id( dataset: str | UUID, workspace: Optional[str | UUID] = None ) -> Tuple[str, UUID]: @@ -426,7 +451,7 @@ def save_as_delta_table( write_mode: str, merge_schema: bool = False, schema: Optional[dict] = None, - lakehouse: Optional[str] = None, + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None, ): """ @@ -444,8 +469,8 @@ def save_as_delta_table( Merges the schemas of the dataframe to the delta table. schema : dict, default=None A dictionary showing the schema of the columns and their data types. - lakehouse : str, default=None - The Fabric lakehouse used by the Direct Lake semantic model. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. Defaults to None which resolves to the lakehouse attached to the notebook. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID. @@ -468,21 +493,16 @@ def save_as_delta_table( ) (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + (lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id( + lakehouse=lakehouse, workspace=workspace_id + ) - if lakehouse is None: - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name( - lakehouse_id=lakehouse_id, workspace=workspace_id - ) - else: - lakehouse_id = resolve_lakehouse_id(lakehouse, workspace_id) - - writeModes = ["append", "overwrite"] + write_modes = ["append", "overwrite"] write_mode = write_mode.lower() - if write_mode not in writeModes: + if write_mode not in write_modes: raise ValueError( - f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {writeModes}." + f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {write_modes}." ) if " " in delta_table_name: @@ -507,16 +527,19 @@ def save_as_delta_table( "timestamp": TimestampType(), } - if schema is None: - spark_df = spark.createDataFrame(dataframe) + if isinstance(dataframe, pd.DataFrame): + if schema is None: + spark_df = spark.createDataFrame(dataframe) + else: + schema_map = StructType( + [ + StructField(column_name, type_mapping[data_type], True) + for column_name, data_type in schema.items() + ] + ) + spark_df = spark.createDataFrame(dataframe, schema_map) else: - schema_map = StructType( - [ - StructField(column_name, type_mapping[data_type], True) - for column_name, data_type in schema.items() - ] - ) - spark_df = spark.createDataFrame(dataframe, schema_map) + spark_df = dataframe filePath = create_abfss_path( lakehouse_id=lakehouse_id, @@ -531,7 +554,7 @@ def save_as_delta_table( else: spark_df.write.mode(write_mode).format("delta").save(filePath) print( - f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse}' lakehouse within the '{workspace_name}' workspace." + f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse_name}' lakehouse within the '{workspace_name}' workspace." ) diff --git a/src/sempy_labs/_optimize.py b/src/sempy_labs/_optimize.py new file mode 100644 index 00000000..5537fe06 --- /dev/null +++ b/src/sempy_labs/_optimize.py @@ -0,0 +1,80 @@ +import pandas as pd +from typing import Optional, Union, List +from uuid import UUID +from sempy_labs._helper_functions import ( + resolve_workspace_name_and_id, + resolve_lakehouse_name_and_id, + resolve_item_name_and_id, + create_abfss_path, +) +from tqdm.auto import tqdm + + +def optimize_delta_tables( + tables: Optional[Union[str, List[str]]] = None, + source: Optional[str | UUID] = None, + source_type: str = "Lakehouse", + workspace: Optional[str | UUID] = None, +): + """ + Runs the `OPTIMIZE `_ function over the specified delta tables. + + Parameters + ---------- + tables : str | List[str], default=None + The table(s) to optimize. + Defaults to None which resovles to optimizing all tables within the lakehouse. + source : str | uuid.UUID, default=None + The source location of the delta table (i.e. lakehouse). + Defaults to None which resolves to the lakehouse attached to the notebook. + source_type : str, default="Lakehouse" + The source type (i.e. "Lakehouse", "SemanticModel") + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID used by the lakehouse. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + from pyspark.sql import SparkSession + from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables + from delta import DeltaTable + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + + if source is None: + (item_name, item_id) = resolve_lakehouse_name_and_id() + else: + (item_name, item_id) = resolve_item_name_and_id( + item=source, type=source_type, workspace=workspace_id + ) + + if isinstance(tables, str): + tables = [tables] + + if source_type == "Lakehouse": + dfL = get_lakehouse_tables(lakehouse=item_name, workspace=workspace_id) + dfL_delta = dfL[dfL["Format"] == "delta"] + + if tables is not None: + delta_tables = dfL_delta[dfL_delta["Table Name"].isin(tables)] + else: + delta_tables = dfL_delta.copy() + else: + data = [] + for t in tables: + new_data = { + "Table Name": t, + "Location": create_abfss_path(workspace_id, item_id, t), + } + data.append(new_data) + + delta_tables = pd.DataFrame(data) + + spark = SparkSession.builder.getOrCreate() + + for _, r in (bar := tqdm(delta_tables.iterrows())): + tableName = r["Table Name"] + tablePath = r["Location"] + bar.set_description(f"Optimizing the '{tableName}' table...") + deltaTable = DeltaTable.forPath(spark, tablePath) + deltaTable.optimize().executeCompaction() diff --git a/src/sempy_labs/lakehouse/_lakehouse.py b/src/sempy_labs/lakehouse/_lakehouse.py index 85202c12..ac27f1eb 100644 --- a/src/sempy_labs/lakehouse/_lakehouse.py +++ b/src/sempy_labs/lakehouse/_lakehouse.py @@ -52,35 +52,11 @@ def optimize_lakehouse_tables( or if no lakehouse attached, resolves to the workspace of the notebook. """ - from pyspark.sql import SparkSession - from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables - from delta import DeltaTable - - (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - - if lakehouse is None: - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name(lakehouse_id, workspace_id) - - lakeTables = get_lakehouse_tables(lakehouse=lakehouse, workspace=workspace_id) - lakeTablesDelta = lakeTables[lakeTables["Format"] == "delta"] - - if isinstance(tables, str): - tables = [tables] - - if tables is not None: - tables_filt = lakeTablesDelta[lakeTablesDelta["Table Name"].isin(tables)] - else: - tables_filt = lakeTablesDelta.copy() + from sempy_labs._optimize import optimize_delta_tables - spark = SparkSession.builder.getOrCreate() - - for _, r in (bar := tqdm(tables_filt.iterrows())): - tableName = r["Table Name"] - tablePath = r["Location"] - bar.set_description(f"Optimizing the '{tableName}' table...") - deltaTable = DeltaTable.forPath(spark, tablePath) - deltaTable.optimize().executeCompaction() + optimize_delta_tables( + tables=tables, source=lakehouse, source_type="Lakehouse", workspace=workspace + ) @log From 286a5ffe12a3350b614b0b87efaed1e203357e93 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 30 Dec 2024 10:45:11 +0200 Subject: [PATCH 2/7] warehouse support ID --- src/sempy_labs/_helper_functions.py | 58 +++++++++++++++-------------- src/sempy_labs/_sql.py | 18 +++++---- src/sempy_labs/_warehouses.py | 12 +++--- 3 files changed, 47 insertions(+), 41 deletions(-) diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index f61fa52e..bbc5c486 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -177,21 +177,22 @@ def resolve_lakehouse_name_and_id( ) -> Tuple[str, UUID]: (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + type = "Lakehouse" if lakehouse is None: lakehouse_id = fabric.get_lakehouse_id() lakehouse_name = fabric.resolve_item_name( - item_id=lakehouse_id, type="Lakehouse", workspace=workspace_id + item_id=lakehouse_id, type=type, workspace=workspace_id ) elif _is_valid_uuid(lakehouse): lakehouse_id = lakehouse lakehouse_name = fabric.resolve_item_name( - item_id=lakehouse_id, type="Lakehouse", workspace=workspace_id + item_id=lakehouse_id, type=type, workspace=workspace_id ) else: lakehouse_name = lakehouse lakehouse_id = fabric.resolve_item_id( - item_name=lakehouse, type="Lakehouse", workspace=workspace_id + item_name=lakehouse, type=type, workspace=workspace_id ) return lakehouse_name, lakehouse_id @@ -201,18 +202,9 @@ def resolve_dataset_name_and_id( dataset: str | UUID, workspace: Optional[str | UUID] = None ) -> Tuple[str, UUID]: - (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - - if _is_valid_uuid(dataset): - dataset_id = dataset - dataset_name = fabric.resolve_item_name( - item_id=dataset_id, type="SemanticModel", workspace=workspace_id - ) - else: - dataset_name = dataset - dataset_id = fabric.resolve_item_id( - item_name=dataset, type="SemanticModel", workspace=workspace_id - ) + (dataset_name, dataset_id) = resolve_item_name_and_id( + item=dataset, type="SemanticModel", workspace=workspace + ) return dataset_name, dataset_id @@ -305,15 +297,15 @@ def resolve_lakehouse_name( def resolve_lakehouse_id( - lakehouse: str, workspace: Optional[str | UUID] = None + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None ) -> UUID: """ Obtains the ID of the Fabric lakehouse. Parameters ---------- - lakehouse : str - The name of the Fabric lakehouse. + lakehouse : str | uuid.UUID, default=None + The name or ID of the Fabric lakehouse. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID. Defaults to None which resolves to the workspace of the attached lakehouse @@ -325,9 +317,14 @@ def resolve_lakehouse_id( The ID of the Fabric lakehouse. """ - return fabric.resolve_item_id( - item_name=lakehouse, type="Lakehouse", workspace=workspace - ) + if lakehouse is None: + return fabric.get_lakehouse_id() + elif _is_valid_uuid(lakehouse): + return lakehouse + else: + fabric.resolve_item_id( + item_name=lakehouse, type="Lakehouse", workspace=workspace + ) def get_direct_lake_sql_endpoint( @@ -1047,14 +1044,16 @@ def _get_adls_client(account_name): return service_client -def resolve_warehouse_id(warehouse: str, workspace: Optional[str | UUID]) -> UUID: +def resolve_warehouse_id( + warehouse: str | UUID, workspace: Optional[str | UUID] +) -> UUID: """ Obtains the Id for a given warehouse. Parameters ---------- - warehouse : str - The warehouse name + warehouse : str | uuid.UUID + The warehouse name or ID. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID in which the semantic model resides. Defaults to None which resolves to the workspace of the attached lakehouse @@ -1062,13 +1061,16 @@ def resolve_warehouse_id(warehouse: str, workspace: Optional[str | UUID]) -> UUI Returns ------- - UUID + uuid.UUID The warehouse Id. """ - return fabric.resolve_item_id( - item_name=warehouse, type="Warehouse", workspace=workspace - ) + if _is_valid_uuid(warehouse): + return warehouse + else: + return fabric.resolve_item_id( + item_name=warehouse, type="Warehouse", workspace=workspace + ) def get_language_codes(languages: str | List[str]): diff --git a/src/sempy_labs/_sql.py b/src/sempy_labs/_sql.py index 617d226c..e8f95ed9 100644 --- a/src/sempy_labs/_sql.py +++ b/src/sempy_labs/_sql.py @@ -6,8 +6,8 @@ from itertools import chain, repeat from sempy.fabric.exceptions import FabricHTTPException from sempy_labs._helper_functions import ( - resolve_warehouse_id, - resolve_lakehouse_id, + resolve_lakehouse_name_and_id, + resolve_item_name_and_id, resolve_workspace_name_and_id, ) from uuid import UUID @@ -35,7 +35,7 @@ def _bytes2mswin_bstr(value: bytes) -> bytes: class ConnectBase: def __init__( self, - name: str, + item: str, workspace: Optional[Union[str, UUID]] = None, timeout: Optional[int] = None, endpoint_type: str = "warehouse", @@ -45,11 +45,15 @@ def __init__( (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - # Resolve the appropriate ID (warehouse or lakehouse) + # Resolve the appropriate ID and name (warehouse or lakehouse) if endpoint_type == "warehouse": - resource_id = resolve_warehouse_id(warehouse=name, workspace=workspace_id) + (resource_id, resource_name) = resolve_item_name_and_id( + item=item, type=endpoint_type.capitalize(), workspace=workspace_id + ) else: - resource_id = resolve_lakehouse_id(lakehouse=name, workspace=workspace_id) + (resource_id, resource_name) = resolve_lakehouse_name_and_id( + lakehouse=item, workspace=workspace_id + ) # Get the TDS endpoint client = fabric.FabricRestClient() @@ -72,7 +76,7 @@ def __init__( # Set up the connection string access_token = SynapseTokenProvider()() tokenstruct = _bytes2mswin_bstr(access_token.encode()) - conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={name};Encrypt=Yes;" + conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={resource_name};Encrypt=Yes;" if timeout is not None: conn_str += f"Connect Timeout={timeout};" diff --git a/src/sempy_labs/_warehouses.py b/src/sempy_labs/_warehouses.py index e35a683e..9e9c806d 100644 --- a/src/sempy_labs/_warehouses.py +++ b/src/sempy_labs/_warehouses.py @@ -150,15 +150,15 @@ def delete_warehouse(name: str, workspace: Optional[str | UUID] = None): def get_warehouse_tables( - warehouse: str, workspace: Optional[str | UUID] = None + warehouse: str | UUID, workspace: Optional[str | UUID] = None ) -> pd.DataFrame: """ Shows a list of the tables in the Fabric warehouse. This function is based on INFORMATION_SCHEMA.TABLES. Parameters ---------- - warehouse : str - Name of the Fabric warehouse. + warehouse : str | uuid.UUID + Name or ID of the Fabric warehouse. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID. Defaults to None which resolves to the workspace of the attached lakehouse @@ -185,15 +185,15 @@ def get_warehouse_tables( def get_warehouse_columns( - warehouse: str, workspace: Optional[str | UUID] = None + warehouse: str | UUID, workspace: Optional[str | UUID] = None ) -> pd.DataFrame: """ Shows a list of the columns in each table within the Fabric warehouse. This function is based on INFORMATION_SCHEMA.COLUMNS. Parameters ---------- - warehouse : str - Name of the Fabric warehouse. + warehouse : str | uuid.UUID + Name or ID of the Fabric warehouse. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID. Defaults to None which resolves to the workspace of the attached lakehouse From 5aff7de64fba72b66c2246e6270c869d3145ea45 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 30 Dec 2024 11:11:58 +0200 Subject: [PATCH 3/7] added _split_abfss_path, _get_default_file_path --- src/sempy_labs/_helper_functions.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index bbc5c486..1efbf767 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -52,6 +52,24 @@ def create_abfss_path( return f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{delta_table_name}" +def _get_default_file_path() -> str: + + default_file_storage = _get_fabric_context_setting(name="fs.defaultFS") + + return default_file_storage.split("@")[-1][:-1] + + +def _split_abfss_path(path: str) -> Tuple[UUID, UUID, str]: + + # Extracts the workspace ID, item ID and delta table name from the abfss path. + + workspace_id = path.split("abfss://")[1].split("@")[0] + item_id = path.split(".com/")[1].split("/")[0] + delta_table_name = path.split("/")[-1] + + return workspace_id, item_id, delta_table_name + + def format_dax_object_name(table: str, column: str) -> str: """ Formats a table/column combination to the 'Table Name'[Column Name] format. From 87415d9927b447ad9f60715e00db24fbd2993be7 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 30 Dec 2024 12:07:07 +0200 Subject: [PATCH 4/7] support ID for lakehouse parameter --- .../lakehouse/_get_lakehouse_columns.py | 39 ++++++-------- .../lakehouse/_get_lakehouse_tables.py | 52 +++++++------------ 2 files changed, 37 insertions(+), 54 deletions(-) diff --git a/src/sempy_labs/lakehouse/_get_lakehouse_columns.py b/src/sempy_labs/lakehouse/_get_lakehouse_columns.py index 7d3c2b2a..92f6e18a 100644 --- a/src/sempy_labs/lakehouse/_get_lakehouse_columns.py +++ b/src/sempy_labs/lakehouse/_get_lakehouse_columns.py @@ -1,11 +1,9 @@ -import sempy.fabric as fabric import pandas as pd from pyspark.sql import SparkSession from sempy_labs._helper_functions import ( - resolve_lakehouse_name, format_dax_object_name, - resolve_lakehouse_id, resolve_workspace_name_and_id, + resolve_lakehouse_name_and_id, ) from typing import Optional from sempy._utils._log import log @@ -14,15 +12,15 @@ @log def get_lakehouse_columns( - lakehouse: Optional[str] = None, workspace: Optional[str | UUID] = None + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None ) -> pd.DataFrame: """ Shows the tables and columns of a lakehouse and their respective properties. Parameters ---------- - lakehouse : str, default=None - The Fabric lakehouse. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. Defaults to None which resolves to the lakehouse attached to the notebook. lakehouse_workspace : str | uuid.UUID, default=None The Fabric workspace name or ID used by the lakehouse. @@ -49,34 +47,31 @@ def get_lakehouse_columns( ) (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - - if lakehouse is None: - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name(lakehouse_id, workspace_id) - else: - lakehouse_id = resolve_lakehouse_id(lakehouse, workspace_id) + (lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id( + lakehouse=lakehouse, workspace=workspace_id + ) spark = SparkSession.builder.getOrCreate() tables = get_lakehouse_tables( - lakehouse=lakehouse, workspace=workspace_id, extended=False, count_rows=False + lakehouse=lakehouse_id, workspace=workspace_id, extended=False, count_rows=False ) tables_filt = tables[tables["Format"] == "delta"] - for i, r in tables_filt.iterrows(): - tName = r["Table Name"] - tPath = r["Location"] - delta_table = DeltaTable.forPath(spark, tPath) + for _, r in tables_filt.iterrows(): + table_name = r["Table Name"] + path = r["Location"] + delta_table = DeltaTable.forPath(spark, path) sparkdf = delta_table.toDF() - for cName, data_type in sparkdf.dtypes: - tc = format_dax_object_name(tName, cName) + for col_name, data_type in sparkdf.dtypes: + full_column_name = format_dax_object_name(table_name, col_name) new_data = { "Workspace Name": workspace_name, "Lakehouse Name": lakehouse, - "Table Name": tName, - "Column Name": cName, - "Full Column Name": tc, + "Table Name": table_name, + "Column Name": col_name, + "Full Column Name": full_column_name, "Data Type": data_type, } df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) diff --git a/src/sempy_labs/lakehouse/_get_lakehouse_tables.py b/src/sempy_labs/lakehouse/_get_lakehouse_tables.py index bea62085..2f9eaf52 100644 --- a/src/sempy_labs/lakehouse/_get_lakehouse_tables.py +++ b/src/sempy_labs/lakehouse/_get_lakehouse_tables.py @@ -4,10 +4,11 @@ import pyarrow.parquet as pq import datetime from sempy_labs._helper_functions import ( - resolve_lakehouse_id, - resolve_lakehouse_name, + _get_max_run_id, resolve_workspace_name_and_id, + resolve_lakehouse_name_and_id, pagination, + save_as_delta_table, ) from sempy_labs.directlake._guardrails import ( get_sku_size, @@ -23,7 +24,7 @@ @log def get_lakehouse_tables( - lakehouse: Optional[str] = None, + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None, extended: bool = False, count_rows: bool = False, @@ -36,8 +37,8 @@ def get_lakehouse_tables( Parameters ---------- - lakehouse : str, default=None - The Fabric lakehouse. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. Defaults to None which resolves to the lakehouse attached to the notebook. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID used by the lakehouse. @@ -68,12 +69,9 @@ def get_lakehouse_tables( ) (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - - if lakehouse is None: - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name(lakehouse_id, workspace_id) - else: - lakehouse_id = resolve_lakehouse_id(lakehouse, workspace_id) + (lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id( + lakehouse=lakehouse, workspace=workspace_id + ) if count_rows: # Setting countrows defaults to extended=True extended = True @@ -106,7 +104,7 @@ def get_lakehouse_tables( for i in r.get("data", []): new_data = { "Workspace Name": workspace_name, - "Lakehouse Name": lakehouse, + "Lakehouse Name": lakehouse_name, "Table Name": i.get("name"), "Format": i.get("format"), "Type": i.get("type"), @@ -179,23 +177,17 @@ def get_lakehouse_tables( f"{icons.red_dot} In order to save the report.json file, a lakehouse must be attached to the notebook. Please attach a lakehouse to this notebook." ) - spark = SparkSession.builder.getOrCreate() - - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name( - lakehouse_id=lakehouse_id, workspace=workspace_id - ) + (current_lakehouse_name, current_lakehouse_id) = resolve_lakehouse_name_and_id() lakeTName = "lakehouse_table_details" lakeT_filt = df[df["Table Name"] == lakeTName] - query = f"SELECT MAX(RunId) FROM {lakehouse}.{lakeTName}" - if len(lakeT_filt) == 0: - runId = 1 + run_id = 1 else: - dfSpark = spark.sql(query) - maxRunId = dfSpark.collect()[0][0] - runId = maxRunId + 1 + max_run_id = _get_max_run_id( + lakehouse=current_lakehouse_name, table_name=lakeTName + ) + run_id = max_run_id + 1 export_df = df.copy() @@ -240,15 +232,11 @@ def get_lakehouse_tables( print( f"{icons.in_progress} Saving Lakehouse table properties to the '{lakeTName}' table in the lakehouse...\n" ) - now = datetime.datetime.now() - export_df["Timestamp"] = now - export_df["RunId"] = runId + export_df["Timestamp"] = datetime.datetime.now() + export_df["RunId"] = run_id - export_df.columns = export_df.columns.str.replace(" ", "_") - spark_df = spark.createDataFrame(export_df) - spark_df.write.mode("append").format("delta").saveAsTable(lakeTName) - print( - f"{icons.bullet} Lakehouse table properties have been saved to the '{lakeTName}' delta table." + save_as_delta_table( + dataframe=export_df, delta_table_name=lakeTName, write_mode="append" ) return df From a3fc2ec24d2bf834edd6fc95786e62003fdf0d06 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 9 Jan 2025 10:51:54 +0200 Subject: [PATCH 5/7] removed from init --- src/sempy_labs/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sempy_labs/__init__.py b/src/sempy_labs/__init__.py index 0e8f9307..423563fe 100644 --- a/src/sempy_labs/__init__.py +++ b/src/sempy_labs/__init__.py @@ -1,5 +1,4 @@ from sempy_labs._job_scheduler import list_item_job_instances -from sempy_labs._optimize import optimize_delta_tables from sempy_labs._gateways import ( list_gateway_members, list_gateway_role_assigments, From f10ccaf392503bc0fa2ee936bd67e128b5748774 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 9 Jan 2025 10:52:19 +0200 Subject: [PATCH 6/7] removed --- src/sempy_labs/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sempy_labs/__init__.py b/src/sempy_labs/__init__.py index 423563fe..da57e489 100644 --- a/src/sempy_labs/__init__.py +++ b/src/sempy_labs/__init__.py @@ -470,5 +470,4 @@ "bind_semantic_model_to_gateway", "list_semantic_model_errors", "list_item_job_instances", - "optimize_delta_tables", ] From fa5fa14b9e98d7430c224aa9c4c23ac11d05fa34 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 9 Jan 2025 21:28:28 +0200 Subject: [PATCH 7/7] fixed per comments --- src/sempy_labs/_helper_functions.py | 8 +-- src/sempy_labs/_optimize.py | 80 ----------------------------- 2 files changed, 4 insertions(+), 84 deletions(-) delete mode 100644 src/sempy_labs/_optimize.py diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 1efbf767..170d20c4 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -61,11 +61,11 @@ def _get_default_file_path() -> str: def _split_abfss_path(path: str) -> Tuple[UUID, UUID, str]: - # Extracts the workspace ID, item ID and delta table name from the abfss path. + parsed_url = urllib.parse.urlparse(path) - workspace_id = path.split("abfss://")[1].split("@")[0] - item_id = path.split(".com/")[1].split("/")[0] - delta_table_name = path.split("/")[-1] + workspace_id = parsed_url.netloc.split("@")[0] + item_id = parsed_url.path.lstrip("/").split("/")[0] + delta_table_name = parsed_url.path.split("/")[-1] return workspace_id, item_id, delta_table_name diff --git a/src/sempy_labs/_optimize.py b/src/sempy_labs/_optimize.py deleted file mode 100644 index 5537fe06..00000000 --- a/src/sempy_labs/_optimize.py +++ /dev/null @@ -1,80 +0,0 @@ -import pandas as pd -from typing import Optional, Union, List -from uuid import UUID -from sempy_labs._helper_functions import ( - resolve_workspace_name_and_id, - resolve_lakehouse_name_and_id, - resolve_item_name_and_id, - create_abfss_path, -) -from tqdm.auto import tqdm - - -def optimize_delta_tables( - tables: Optional[Union[str, List[str]]] = None, - source: Optional[str | UUID] = None, - source_type: str = "Lakehouse", - workspace: Optional[str | UUID] = None, -): - """ - Runs the `OPTIMIZE `_ function over the specified delta tables. - - Parameters - ---------- - tables : str | List[str], default=None - The table(s) to optimize. - Defaults to None which resovles to optimizing all tables within the lakehouse. - source : str | uuid.UUID, default=None - The source location of the delta table (i.e. lakehouse). - Defaults to None which resolves to the lakehouse attached to the notebook. - source_type : str, default="Lakehouse" - The source type (i.e. "Lakehouse", "SemanticModel") - workspace : str | uuid.UUID, default=None - The Fabric workspace name or ID used by the lakehouse. - Defaults to None which resolves to the workspace of the attached lakehouse - or if no lakehouse attached, resolves to the workspace of the notebook. - """ - - from pyspark.sql import SparkSession - from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables - from delta import DeltaTable - - (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - - if source is None: - (item_name, item_id) = resolve_lakehouse_name_and_id() - else: - (item_name, item_id) = resolve_item_name_and_id( - item=source, type=source_type, workspace=workspace_id - ) - - if isinstance(tables, str): - tables = [tables] - - if source_type == "Lakehouse": - dfL = get_lakehouse_tables(lakehouse=item_name, workspace=workspace_id) - dfL_delta = dfL[dfL["Format"] == "delta"] - - if tables is not None: - delta_tables = dfL_delta[dfL_delta["Table Name"].isin(tables)] - else: - delta_tables = dfL_delta.copy() - else: - data = [] - for t in tables: - new_data = { - "Table Name": t, - "Location": create_abfss_path(workspace_id, item_id, t), - } - data.append(new_data) - - delta_tables = pd.DataFrame(data) - - spark = SparkSession.builder.getOrCreate() - - for _, r in (bar := tqdm(delta_tables.iterrows())): - tableName = r["Table Name"] - tablePath = r["Location"] - bar.set_description(f"Optimizing the '{tableName}' table...") - deltaTable = DeltaTable.forPath(spark, tablePath) - deltaTable.optimize().executeCompaction()