diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 64b33f45..170d20c4 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -52,6 +52,24 @@ def create_abfss_path( return f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{delta_table_name}" +def _get_default_file_path() -> str: + + default_file_storage = _get_fabric_context_setting(name="fs.defaultFS") + + return default_file_storage.split("@")[-1][:-1] + + +def _split_abfss_path(path: str) -> Tuple[UUID, UUID, str]: + + parsed_url = urllib.parse.urlparse(path) + + workspace_id = parsed_url.netloc.split("@")[0] + item_id = parsed_url.path.lstrip("/").split("/")[0] + delta_table_name = parsed_url.path.split("/")[-1] + + return workspace_id, item_id, delta_table_name + + def format_dax_object_name(table: str, column: str) -> str: """ Formats a table/column combination to the 'Table Name'[Column Name] format. @@ -172,23 +190,40 @@ def resolve_item_name_and_id( return item_name, item_id -def resolve_dataset_name_and_id( - dataset: str | UUID, workspace: Optional[str | UUID] = None +def resolve_lakehouse_name_and_id( + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None ) -> Tuple[str, UUID]: (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + type = "Lakehouse" - if _is_valid_uuid(dataset): - dataset_id = dataset - dataset_name = fabric.resolve_item_name( - item_id=dataset_id, type="SemanticModel", workspace=workspace_id + if lakehouse is None: + lakehouse_id = fabric.get_lakehouse_id() + lakehouse_name = fabric.resolve_item_name( + item_id=lakehouse_id, type=type, workspace=workspace_id + ) + elif _is_valid_uuid(lakehouse): + lakehouse_id = lakehouse + lakehouse_name = fabric.resolve_item_name( + item_id=lakehouse_id, type=type, workspace=workspace_id ) else: - dataset_name = dataset - dataset_id = fabric.resolve_item_id( - item_name=dataset, type="SemanticModel", workspace=workspace_id + lakehouse_name = lakehouse + lakehouse_id = fabric.resolve_item_id( + item_name=lakehouse, type=type, workspace=workspace_id ) + return lakehouse_name, lakehouse_id + + +def resolve_dataset_name_and_id( + dataset: str | UUID, workspace: Optional[str | UUID] = None +) -> Tuple[str, UUID]: + + (dataset_name, dataset_id) = resolve_item_name_and_id( + item=dataset, type="SemanticModel", workspace=workspace + ) + return dataset_name, dataset_id @@ -280,15 +315,15 @@ def resolve_lakehouse_name( def resolve_lakehouse_id( - lakehouse: str, workspace: Optional[str | UUID] = None + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None ) -> UUID: """ Obtains the ID of the Fabric lakehouse. Parameters ---------- - lakehouse : str - The name of the Fabric lakehouse. + lakehouse : str | uuid.UUID, default=None + The name or ID of the Fabric lakehouse. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID. Defaults to None which resolves to the workspace of the attached lakehouse @@ -300,9 +335,14 @@ def resolve_lakehouse_id( The ID of the Fabric lakehouse. """ - return fabric.resolve_item_id( - item_name=lakehouse, type="Lakehouse", workspace=workspace - ) + if lakehouse is None: + return fabric.get_lakehouse_id() + elif _is_valid_uuid(lakehouse): + return lakehouse + else: + fabric.resolve_item_id( + item_name=lakehouse, type="Lakehouse", workspace=workspace + ) def get_direct_lake_sql_endpoint( @@ -426,7 +466,7 @@ def save_as_delta_table( write_mode: str, merge_schema: bool = False, schema: Optional[dict] = None, - lakehouse: Optional[str] = None, + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None, ): """ @@ -444,8 +484,8 @@ def save_as_delta_table( Merges the schemas of the dataframe to the delta table. schema : dict, default=None A dictionary showing the schema of the columns and their data types. - lakehouse : str, default=None - The Fabric lakehouse used by the Direct Lake semantic model. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. Defaults to None which resolves to the lakehouse attached to the notebook. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID. @@ -468,21 +508,16 @@ def save_as_delta_table( ) (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + (lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id( + lakehouse=lakehouse, workspace=workspace_id + ) - if lakehouse is None: - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name( - lakehouse_id=lakehouse_id, workspace=workspace_id - ) - else: - lakehouse_id = resolve_lakehouse_id(lakehouse, workspace_id) - - writeModes = ["append", "overwrite"] + write_modes = ["append", "overwrite"] write_mode = write_mode.lower() - if write_mode not in writeModes: + if write_mode not in write_modes: raise ValueError( - f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {writeModes}." + f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {write_modes}." ) if " " in delta_table_name: @@ -507,16 +542,19 @@ def save_as_delta_table( "timestamp": TimestampType(), } - if schema is None: - spark_df = spark.createDataFrame(dataframe) + if isinstance(dataframe, pd.DataFrame): + if schema is None: + spark_df = spark.createDataFrame(dataframe) + else: + schema_map = StructType( + [ + StructField(column_name, type_mapping[data_type], True) + for column_name, data_type in schema.items() + ] + ) + spark_df = spark.createDataFrame(dataframe, schema_map) else: - schema_map = StructType( - [ - StructField(column_name, type_mapping[data_type], True) - for column_name, data_type in schema.items() - ] - ) - spark_df = spark.createDataFrame(dataframe, schema_map) + spark_df = dataframe filePath = create_abfss_path( lakehouse_id=lakehouse_id, @@ -531,7 +569,7 @@ def save_as_delta_table( else: spark_df.write.mode(write_mode).format("delta").save(filePath) print( - f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse}' lakehouse within the '{workspace_name}' workspace." + f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse_name}' lakehouse within the '{workspace_name}' workspace." ) @@ -1024,14 +1062,16 @@ def _get_adls_client(account_name): return service_client -def resolve_warehouse_id(warehouse: str, workspace: Optional[str | UUID]) -> UUID: +def resolve_warehouse_id( + warehouse: str | UUID, workspace: Optional[str | UUID] +) -> UUID: """ Obtains the Id for a given warehouse. Parameters ---------- - warehouse : str - The warehouse name + warehouse : str | uuid.UUID + The warehouse name or ID. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID in which the semantic model resides. Defaults to None which resolves to the workspace of the attached lakehouse @@ -1039,13 +1079,16 @@ def resolve_warehouse_id(warehouse: str, workspace: Optional[str | UUID]) -> UUI Returns ------- - UUID + uuid.UUID The warehouse Id. """ - return fabric.resolve_item_id( - item_name=warehouse, type="Warehouse", workspace=workspace - ) + if _is_valid_uuid(warehouse): + return warehouse + else: + return fabric.resolve_item_id( + item_name=warehouse, type="Warehouse", workspace=workspace + ) def get_language_codes(languages: str | List[str]): diff --git a/src/sempy_labs/_sql.py b/src/sempy_labs/_sql.py index 617d226c..e8f95ed9 100644 --- a/src/sempy_labs/_sql.py +++ b/src/sempy_labs/_sql.py @@ -6,8 +6,8 @@ from itertools import chain, repeat from sempy.fabric.exceptions import FabricHTTPException from sempy_labs._helper_functions import ( - resolve_warehouse_id, - resolve_lakehouse_id, + resolve_lakehouse_name_and_id, + resolve_item_name_and_id, resolve_workspace_name_and_id, ) from uuid import UUID @@ -35,7 +35,7 @@ def _bytes2mswin_bstr(value: bytes) -> bytes: class ConnectBase: def __init__( self, - name: str, + item: str, workspace: Optional[Union[str, UUID]] = None, timeout: Optional[int] = None, endpoint_type: str = "warehouse", @@ -45,11 +45,15 @@ def __init__( (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - # Resolve the appropriate ID (warehouse or lakehouse) + # Resolve the appropriate ID and name (warehouse or lakehouse) if endpoint_type == "warehouse": - resource_id = resolve_warehouse_id(warehouse=name, workspace=workspace_id) + (resource_id, resource_name) = resolve_item_name_and_id( + item=item, type=endpoint_type.capitalize(), workspace=workspace_id + ) else: - resource_id = resolve_lakehouse_id(lakehouse=name, workspace=workspace_id) + (resource_id, resource_name) = resolve_lakehouse_name_and_id( + lakehouse=item, workspace=workspace_id + ) # Get the TDS endpoint client = fabric.FabricRestClient() @@ -72,7 +76,7 @@ def __init__( # Set up the connection string access_token = SynapseTokenProvider()() tokenstruct = _bytes2mswin_bstr(access_token.encode()) - conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={name};Encrypt=Yes;" + conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={resource_name};Encrypt=Yes;" if timeout is not None: conn_str += f"Connect Timeout={timeout};" diff --git a/src/sempy_labs/_warehouses.py b/src/sempy_labs/_warehouses.py index e35a683e..9e9c806d 100644 --- a/src/sempy_labs/_warehouses.py +++ b/src/sempy_labs/_warehouses.py @@ -150,15 +150,15 @@ def delete_warehouse(name: str, workspace: Optional[str | UUID] = None): def get_warehouse_tables( - warehouse: str, workspace: Optional[str | UUID] = None + warehouse: str | UUID, workspace: Optional[str | UUID] = None ) -> pd.DataFrame: """ Shows a list of the tables in the Fabric warehouse. This function is based on INFORMATION_SCHEMA.TABLES. Parameters ---------- - warehouse : str - Name of the Fabric warehouse. + warehouse : str | uuid.UUID + Name or ID of the Fabric warehouse. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID. Defaults to None which resolves to the workspace of the attached lakehouse @@ -185,15 +185,15 @@ def get_warehouse_tables( def get_warehouse_columns( - warehouse: str, workspace: Optional[str | UUID] = None + warehouse: str | UUID, workspace: Optional[str | UUID] = None ) -> pd.DataFrame: """ Shows a list of the columns in each table within the Fabric warehouse. This function is based on INFORMATION_SCHEMA.COLUMNS. Parameters ---------- - warehouse : str - Name of the Fabric warehouse. + warehouse : str | uuid.UUID + Name or ID of the Fabric warehouse. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID. Defaults to None which resolves to the workspace of the attached lakehouse diff --git a/src/sempy_labs/lakehouse/_get_lakehouse_columns.py b/src/sempy_labs/lakehouse/_get_lakehouse_columns.py index 7d3c2b2a..92f6e18a 100644 --- a/src/sempy_labs/lakehouse/_get_lakehouse_columns.py +++ b/src/sempy_labs/lakehouse/_get_lakehouse_columns.py @@ -1,11 +1,9 @@ -import sempy.fabric as fabric import pandas as pd from pyspark.sql import SparkSession from sempy_labs._helper_functions import ( - resolve_lakehouse_name, format_dax_object_name, - resolve_lakehouse_id, resolve_workspace_name_and_id, + resolve_lakehouse_name_and_id, ) from typing import Optional from sempy._utils._log import log @@ -14,15 +12,15 @@ @log def get_lakehouse_columns( - lakehouse: Optional[str] = None, workspace: Optional[str | UUID] = None + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None ) -> pd.DataFrame: """ Shows the tables and columns of a lakehouse and their respective properties. Parameters ---------- - lakehouse : str, default=None - The Fabric lakehouse. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. Defaults to None which resolves to the lakehouse attached to the notebook. lakehouse_workspace : str | uuid.UUID, default=None The Fabric workspace name or ID used by the lakehouse. @@ -49,34 +47,31 @@ def get_lakehouse_columns( ) (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - - if lakehouse is None: - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name(lakehouse_id, workspace_id) - else: - lakehouse_id = resolve_lakehouse_id(lakehouse, workspace_id) + (lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id( + lakehouse=lakehouse, workspace=workspace_id + ) spark = SparkSession.builder.getOrCreate() tables = get_lakehouse_tables( - lakehouse=lakehouse, workspace=workspace_id, extended=False, count_rows=False + lakehouse=lakehouse_id, workspace=workspace_id, extended=False, count_rows=False ) tables_filt = tables[tables["Format"] == "delta"] - for i, r in tables_filt.iterrows(): - tName = r["Table Name"] - tPath = r["Location"] - delta_table = DeltaTable.forPath(spark, tPath) + for _, r in tables_filt.iterrows(): + table_name = r["Table Name"] + path = r["Location"] + delta_table = DeltaTable.forPath(spark, path) sparkdf = delta_table.toDF() - for cName, data_type in sparkdf.dtypes: - tc = format_dax_object_name(tName, cName) + for col_name, data_type in sparkdf.dtypes: + full_column_name = format_dax_object_name(table_name, col_name) new_data = { "Workspace Name": workspace_name, "Lakehouse Name": lakehouse, - "Table Name": tName, - "Column Name": cName, - "Full Column Name": tc, + "Table Name": table_name, + "Column Name": col_name, + "Full Column Name": full_column_name, "Data Type": data_type, } df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) diff --git a/src/sempy_labs/lakehouse/_get_lakehouse_tables.py b/src/sempy_labs/lakehouse/_get_lakehouse_tables.py index bea62085..2f9eaf52 100644 --- a/src/sempy_labs/lakehouse/_get_lakehouse_tables.py +++ b/src/sempy_labs/lakehouse/_get_lakehouse_tables.py @@ -4,10 +4,11 @@ import pyarrow.parquet as pq import datetime from sempy_labs._helper_functions import ( - resolve_lakehouse_id, - resolve_lakehouse_name, + _get_max_run_id, resolve_workspace_name_and_id, + resolve_lakehouse_name_and_id, pagination, + save_as_delta_table, ) from sempy_labs.directlake._guardrails import ( get_sku_size, @@ -23,7 +24,7 @@ @log def get_lakehouse_tables( - lakehouse: Optional[str] = None, + lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None, extended: bool = False, count_rows: bool = False, @@ -36,8 +37,8 @@ def get_lakehouse_tables( Parameters ---------- - lakehouse : str, default=None - The Fabric lakehouse. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. Defaults to None which resolves to the lakehouse attached to the notebook. workspace : str | uuid.UUID, default=None The Fabric workspace name or ID used by the lakehouse. @@ -68,12 +69,9 @@ def get_lakehouse_tables( ) (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - - if lakehouse is None: - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name(lakehouse_id, workspace_id) - else: - lakehouse_id = resolve_lakehouse_id(lakehouse, workspace_id) + (lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id( + lakehouse=lakehouse, workspace=workspace_id + ) if count_rows: # Setting countrows defaults to extended=True extended = True @@ -106,7 +104,7 @@ def get_lakehouse_tables( for i in r.get("data", []): new_data = { "Workspace Name": workspace_name, - "Lakehouse Name": lakehouse, + "Lakehouse Name": lakehouse_name, "Table Name": i.get("name"), "Format": i.get("format"), "Type": i.get("type"), @@ -179,23 +177,17 @@ def get_lakehouse_tables( f"{icons.red_dot} In order to save the report.json file, a lakehouse must be attached to the notebook. Please attach a lakehouse to this notebook." ) - spark = SparkSession.builder.getOrCreate() - - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name( - lakehouse_id=lakehouse_id, workspace=workspace_id - ) + (current_lakehouse_name, current_lakehouse_id) = resolve_lakehouse_name_and_id() lakeTName = "lakehouse_table_details" lakeT_filt = df[df["Table Name"] == lakeTName] - query = f"SELECT MAX(RunId) FROM {lakehouse}.{lakeTName}" - if len(lakeT_filt) == 0: - runId = 1 + run_id = 1 else: - dfSpark = spark.sql(query) - maxRunId = dfSpark.collect()[0][0] - runId = maxRunId + 1 + max_run_id = _get_max_run_id( + lakehouse=current_lakehouse_name, table_name=lakeTName + ) + run_id = max_run_id + 1 export_df = df.copy() @@ -240,15 +232,11 @@ def get_lakehouse_tables( print( f"{icons.in_progress} Saving Lakehouse table properties to the '{lakeTName}' table in the lakehouse...\n" ) - now = datetime.datetime.now() - export_df["Timestamp"] = now - export_df["RunId"] = runId + export_df["Timestamp"] = datetime.datetime.now() + export_df["RunId"] = run_id - export_df.columns = export_df.columns.str.replace(" ", "_") - spark_df = spark.createDataFrame(export_df) - spark_df.write.mode("append").format("delta").saveAsTable(lakeTName) - print( - f"{icons.bullet} Lakehouse table properties have been saved to the '{lakeTName}' delta table." + save_as_delta_table( + dataframe=export_df, delta_table_name=lakeTName, write_mode="append" ) return df diff --git a/src/sempy_labs/lakehouse/_lakehouse.py b/src/sempy_labs/lakehouse/_lakehouse.py index 85202c12..ac27f1eb 100644 --- a/src/sempy_labs/lakehouse/_lakehouse.py +++ b/src/sempy_labs/lakehouse/_lakehouse.py @@ -52,35 +52,11 @@ def optimize_lakehouse_tables( or if no lakehouse attached, resolves to the workspace of the notebook. """ - from pyspark.sql import SparkSession - from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables - from delta import DeltaTable - - (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - - if lakehouse is None: - lakehouse_id = fabric.get_lakehouse_id() - lakehouse = resolve_lakehouse_name(lakehouse_id, workspace_id) - - lakeTables = get_lakehouse_tables(lakehouse=lakehouse, workspace=workspace_id) - lakeTablesDelta = lakeTables[lakeTables["Format"] == "delta"] - - if isinstance(tables, str): - tables = [tables] - - if tables is not None: - tables_filt = lakeTablesDelta[lakeTablesDelta["Table Name"].isin(tables)] - else: - tables_filt = lakeTablesDelta.copy() + from sempy_labs._optimize import optimize_delta_tables - spark = SparkSession.builder.getOrCreate() - - for _, r in (bar := tqdm(tables_filt.iterrows())): - tableName = r["Table Name"] - tablePath = r["Location"] - bar.set_description(f"Optimizing the '{tableName}' table...") - deltaTable = DeltaTable.forPath(spark, tablePath) - deltaTable.optimize().executeCompaction() + optimize_delta_tables( + tables=tables, source=lakehouse, source_type="Lakehouse", workspace=workspace + ) @log