Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added delta analyzer #395

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
project = 'semantic-link-labs'
copyright = '2024, Microsoft and community'
author = 'Microsoft and community'
release = '0.8.11'
release = '0.8.13'

# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
Expand Down
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- pytest-cov
- pytest-mock
- pip:
- semantic-link-sempy>=0.8.3
- semantic-link-sempy>=0.8.5
- azure-identity==1.7.1
- azure-storage-blob>=12.9.0
- pandas-stubs
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name="semantic-link-labs"
authors = [
{ name = "Microsoft Corporation" },
]
version="0.8.11"
version="0.8.13"
description="Semantic Link Labs for Microsoft Fabric"
readme="README.md"
requires-python=">=3.10,<3.12"
Expand All @@ -23,7 +23,7 @@ classifiers = [
license= { text = "MIT License" }

dependencies = [
"semantic-link-sempy>=0.8.3",
"semantic-link-sempy>=0.8.5",
"anytree",
"powerbiclient",
"polib",
Expand Down
6 changes: 6 additions & 0 deletions src/sempy_labs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from sempy_labs._job_scheduler import list_item_job_instances
from sempy_labs._delta_analyzer import delta_analyzer
from sempy_labs._gateways import (
list_gateway_members,
list_gateway_role_assigments,
Expand Down Expand Up @@ -185,6 +186,8 @@
evaluate_dax_impersonation,
get_dax_query_dependencies,
get_dax_query_memory_size,
dax_perf_test,
dax_perf_test_bulk,
)
from sempy_labs._generate_semantic_model import (
create_blank_semantic_model,
Expand Down Expand Up @@ -470,4 +473,7 @@
"bind_semantic_model_to_gateway",
"list_semantic_model_errors",
"list_item_job_instances",
"delta_analyzer",
"dax_perf_test",
"dax_perf_test_bulk",
]
165 changes: 164 additions & 1 deletion src/sempy_labs/_dax.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
resolve_dataset_name_and_id,
)
from sempy_labs._model_dependencies import get_model_calc_dependencies
from typing import Optional, List
from typing import Optional, List, Tuple
from sempy._utils._log import log
from uuid import UUID
from sempy_labs.directlake._warm_cache import _put_columns_into_memory
import sempy_labs._icons as icons
import time


@log
Expand Down Expand Up @@ -257,3 +259,164 @@ def get_dax_query_memory_size(
)

return df["Total Size"].sum()


@log
def dax_perf_test(
dataset: str,
dax_queries: dict,
clear_cache_before_run: bool = False,
refresh_type: Optional[str] = None,
rest_time: int = 2,
workspace: Optional[str] = None,
) -> Tuple[pd.DataFrame, dict]:
"""
Runs a performance test on a set of DAX queries.

Parameters
----------
dataset : str
Name of the semantic model.
dax_queries : dict
The dax queries to run in a dictionary format. Here is an example:
{
"Sales Amount Test", """ """ EVALUATE SUMMARIZECOLUMNS("Sales Amount", [Sales Amount]) """ """,
"Order Quantity with Product", """ """ EVALUATE SUMMARIZECOLUMNS('Product'[Color], "Order Qty", [Order Qty]) """ """,
}
clear_cache_before_run : bool, default=False
refresh_type : str, default=None
rest_time : int, default=2
Rest time (in seconds) between the execution of each DAX query.
workspace : str, default=None
The Fabric workspace name.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.

Returns
-------
Tuple[pandas.DataFrame, dict]
A pandas dataframe showing the SQL profiler trace results of the DAX queries.
A dictionary of the query results in pandas dataframes.
"""
from sempy_labs._refresh_semantic_model import refresh_semantic_model
from sempy_labs._clear_cache import clear_cache

base_cols = ["EventClass", "EventSubclass", "CurrentTime", "NTUserName", "TextData"]
begin_cols = base_cols + ["StartTime"]
end_cols = base_cols + ["StartTime", "EndTime", "Duration", "CpuTime", "Success"]

event_schema = {
"QueryBegin": begin_cols + ["ApplicationName"],
"QueryEnd": end_cols + ["ApplicationName"],
}

event_schema["VertiPaqSEQueryBegin"] = begin_cols
event_schema["VertiPaqSEQueryEnd"] = end_cols
event_schema["VertiPaqSEQueryCacheMatch"] = base_cols
event_schema["ExecutionMetrics"] = ["EventClass", "ApplicationName", "TextData"]

query_results = {}

# Establish trace connection
with fabric.create_trace_connection(
dataset=dataset, workspace=workspace
) as trace_connection:
with trace_connection.create_trace(event_schema) as trace:
trace.start()
print(f"{icons.in_progress} Starting performance testing...")
# Loop through DAX queries
for name, dax in dax_queries.items():

if clear_cache_before_run:
clear_cache(dataset=dataset, workspace=workspace)
if refresh_type is not None:
refresh_semantic_model(
dataset=dataset, workspace=workspace, refresh_type=refresh_type
)

fabric.evaluate_dax(
dataset=dataset, workspace=workspace, dax_string="""EVALUATE {1}"""
)
# Run DAX Query
result = fabric.evaluate_dax(
dataset=dataset, workspace=workspace, dax_string=dax
)

# Add results to output
query_results[name] = result

time.sleep(rest_time)
print(f"{icons.green_dot} The '{name}' query has completed.")

df = trace.stop()
# Allow time to collect trace results
time.sleep(5)

# Step 1: Filter out unnecessary operations
query_names = list(dax_queries.keys())
df = df[
~df["Application Name"].isin(["PowerBI", "PowerBIEIM"])
& (~df["Text Data"].str.startswith("EVALUATE {1}"))
]
query_begin = df["Event Class"] == "QueryBegin"
# Step 2: Name queries per dictionary
suffix = "_removeXXX"
query_names_full = [
item for query in query_names for item in (f"{query}{suffix}", query)
]
# Step 3: Assign query names by group and convert to integer
df["Query Name"] = (query_begin).cumsum()
df["Query Name"] = df["Query Name"].where(query_begin, None).ffill()
df["Query Name"] = pd.to_numeric(df["Query Name"], downcast="integer")
# Step 4: Map to full query names
df["Query Name"] = df["Query Name"].map(lambda x: query_names_full[x - 1])
df = df[~df["Query Name"].str.endswith(suffix)]

df = df.reset_index(drop=True)

return df, query_results


def dax_perf_test_bulk(
mapping: dict,
clear_cache_before_run: bool = False,
refresh_type: Optional[str] = None,
rest_time: int = 2,
):
"""
mapping is something like this:

mapping = {
"Workspace1": {
"Dataset1": {
"Query1": "EVALUATE ...",
"Query2": "EVALUATE ...",
},
"Dataset2": {
"Query3": "EVALUATE ...",
"Query4": "EVALUATE ...",
}
},
"Workspace2": {
"Dataset3": {
"Query5": "EVALUATE ...",
"Query6": "EVALUATE ...",
},
"Dataset4": {
"Query7": "EVALUATE ...",
"Query8": "EVALUATE ...",
}
}
}
"""

for workspace, datasets in mapping.items():
for dataset, queries in datasets.items():
dax_perf_test(
dataset=dataset,
dax_queries=queries,
clear_cache_before_run=clear_cache_before_run,
refresh_type=refresh_type,
rest_time=rest_time,
workspace=workspace,
)
Loading