-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: 4305 added gspread analytics package (#4314)
* feat: added spreadsheet functions to ga package (#4305) * chore: refactored ga metric and dimension names to constants (#4305) * chore: finished refactor (#4305) * chore: bumped analytics package version (#4310) * chore: added new dependncies to setup.py (#4310) * fix: removed redundant api authentication (#4305)
- Loading branch information
Showing
6 changed files
with
378 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# Metric names | ||
METRIC_EVENT_COUNT = 'eventCount' | ||
METRIC_TOTAL_USERS = 'totalUsers' | ||
|
||
# DIMENSIONS | ||
DIMENSION_PAGE_PATH = { | ||
'id': 'pagePath', | ||
'alias': 'page_path', | ||
'remove_matches': None, | ||
} | ||
DIMENSION_BUILTIN_URL = { | ||
'id': 'linkUrl', | ||
'alias': 'builtin_url', | ||
'remove_matches': r"\s*", | ||
} | ||
DIMENSION_EVENT_NAME = { | ||
'id': 'eventName', | ||
'alias': 'event_name', | ||
'remove_matches': None, | ||
} | ||
DIMENSION_CUSTOM_URL = { | ||
'id': 'customEvent:click_url', | ||
'alias': 'outbound_url', | ||
'remove_matches': r"\(not set\)", | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,245 @@ | ||
import gspread | ||
import gspread_formatting | ||
from enum import Enum | ||
from googleapiclient.discovery import build | ||
|
||
class FILE_OVERRIDE_BEHAVIORS(Enum): | ||
OVERRIDE_IF_IN_SAME_PLACE = 1 | ||
EXIT_IF_IN_SAME_PLACE = 2 | ||
EXIT_ANYWHERE = 3 | ||
|
||
class WORKSHEET_OVERRIDE_BEHAVIORS(Enum): | ||
OVERRIDE = 1 | ||
EXIT = 2 | ||
|
||
FONT_SIZE_PTS = 10 | ||
PTS_PIXELS_RATIO = 4/3 | ||
DEFAULT_BUFFER_CHARS = 2 | ||
|
||
def extract_credentials(authentication_response): | ||
"""Extracts the credentials from the tuple from api.authenticate""" | ||
return authentication_response[3] | ||
|
||
def authenticate_gspread(authentication_response): | ||
"""Authenticates the gspread client using the credentials in the tuple from api.authenticate""" | ||
gc = gspread.authorize(extract_credentials(authentication_response)) | ||
return gc | ||
|
||
def authenticate_drive_api(authentication_response): | ||
"""Authenticates the Drive API using the response from api.authenticate""" | ||
return authentication_response[0] | ||
|
||
def check_sheet_exists(gc, sheet_name): | ||
""" | ||
Determine if a sheet named 'sheet_name' exists anywhere in the project. | ||
:param gc: the gspread client | ||
:param sheet_name: the name of the sheet to check for | ||
:returns: True if the sheet exists, otherwise False | ||
""" | ||
try: | ||
gc.open(sheet_name) | ||
return True | ||
except gspread.exceptions.SpreadsheetNotFound: | ||
return False | ||
|
||
def execute_drive_list(drive_api, search_params): | ||
""" | ||
Execute a files().list() request on the Drive API with the given search parameters. | ||
Returns the 'files' components of the response. | ||
Positional arguments: | ||
:param drive_api: the Drive API object | ||
:param search_params: the search parameters, see https://developers.google.com/drive/api/v3/search-files | ||
:returns: the 'files' components of the response | ||
""" | ||
files_found = [] | ||
page_token = None | ||
while True: | ||
request = drive_api.files().list(q=search_params, spaces="drive", pageToken=page_token) | ||
response = request.execute() | ||
page_token = response.get("nextPageToken", None) | ||
files_found += response.get("files", []) | ||
if page_token is None: | ||
break | ||
return files_found | ||
|
||
def search_for_folder_id(drive_api, folder_name, allow_trashed = False, allow_duplicates = False): | ||
""" | ||
Search for a folder by name in the Drive API. | ||
Returns a list of folder ids that match the search criteria. | ||
:param drive_api: the Drive API object | ||
:param folder_name: the name of the folder to search for | ||
:param allow_trashed: whether to include trashed folders in the search, defaults to False | ||
:param allow_duplicates: whether to allow multiple folders with the same name, defaults to False | ||
:returns: a list of folder ids that match the search criteria | ||
""" | ||
search_params = f"name = '{folder_name}' and mimeType = 'application/vnd.google-apps.folder'" | ||
if not allow_trashed: | ||
search_params += " and trashed = false" | ||
|
||
files = execute_drive_list(drive_api, search_params) | ||
files_exact_match = tuple(filter(lambda file: file["name"] == folder_name, files)) | ||
|
||
if len(files_exact_match) > 1: | ||
if not allow_duplicates: | ||
raise RuntimeError("Too many files returned") | ||
if len(files_exact_match) == 0: | ||
raise RuntimeError("No such folder exists") | ||
|
||
return [file["id"] for file in files_exact_match] | ||
|
||
|
||
def create_sheet_in_folder(authentication_response, sheet_name, parent_folder_name=None, override_behavior=FILE_OVERRIDE_BEHAVIORS.EXIT_ANYWHERE): | ||
""" | ||
Create a new sheet in the project with the given name and parent folder. | ||
Returns the new sheet. | ||
:param authentication_response: the service parameters tuple | ||
:param sheet_name: the name of the new sheet | ||
:param parent_folder_name: the name of the parent folder for the new sheet | ||
:param override_behavior: the behavior to take if the sheet already exists | ||
:returns: the gspread.Spreadsheet object of the new sheet | ||
:rtype: gspread.Spreadsheet | ||
""" | ||
# Build Drive API | ||
gc = authenticate_gspread(authentication_response) | ||
drive_api = authenticate_drive_api(authentication_response) | ||
parent_folder_id = None if parent_folder_name is None else search_for_folder_id(drive_api, parent_folder_name)[0] | ||
|
||
# Check if sheet already exists and handle based on input | ||
if check_sheet_exists(gc, sheet_name): | ||
if override_behavior == FILE_OVERRIDE_BEHAVIORS.EXIT_ANYWHERE: | ||
raise RuntimeError("Sheet already exists") | ||
matching_search = f"name = '{sheet_name}' and mimeType = 'application/vnd.google-apps.spreadsheet'" | ||
if parent_folder_id is None: | ||
matching_search += " and 'root' in parents" | ||
else: | ||
matching_search += f" and '{parent_folder_id}' in parents" | ||
matching_files = execute_drive_list(drive_api, matching_search) | ||
|
||
if len(matching_files) > 0: | ||
if override_behavior == FILE_OVERRIDE_BEHAVIORS.EXIT_IF_IN_SAME_PLACE: | ||
raise RuntimeError("File already exists in the same folder") | ||
elif override_behavior == FILE_OVERRIDE_BEHAVIORS.OVERRIDE_IF_IN_SAME_PLACE: | ||
for file in matching_files: | ||
drive_api.files().delete(fileId=file["id"]).execute() | ||
# Create file body | ||
body = { | ||
'name': sheet_name, | ||
'mimeType': 'application/vnd.google-apps.spreadsheet', | ||
} | ||
if parent_folder_id is not None: | ||
body["parents"] = [parent_folder_id] | ||
request = drive_api.files().create(body=body) | ||
new_sheet = request.execute() | ||
|
||
# Get id of fresh sheet | ||
spread_id = new_sheet["id"] | ||
|
||
# Open new file | ||
return gc.open_by_key(spread_id) | ||
|
||
def fill_worksheet_with_df( | ||
sheet, | ||
df, | ||
worksheet_name, | ||
overlapBehavior, | ||
options={ | ||
"bold_header": True, | ||
"center_header": True, | ||
"freeze_header": True, | ||
"column_widths": {"justify": True, "buffer_chars": DEFAULT_BUFFER_CHARS} | ||
} | ||
): | ||
""" | ||
Fill a worksheet with the contents of a DataFrame. | ||
If the worksheet already exists, the behavior is determined by overlapBehavior. | ||
The options dictionary can be used to customize the formatting of the worksheet. | ||
:param sheet: the gspread.Spreadsheet object | ||
:param df: the DataFrame to fill the worksheet with | ||
:param worksheet_name: the name of the worksheet to fill. Cannot be "Sheet1" | ||
:param overlapBehavior: the behavior to take if the worksheet already exists. | ||
:param options: the formatting options for the worksheet. | ||
Should be a dictionary with optional elements "bold_header", "center_header", "freeze_header", and "column_widths", optional | ||
""" | ||
# Sheet1 is special since it's created by default, so it's not allowed | ||
assert worksheet_name != "Sheet1" | ||
|
||
# Check if worksheet already exists and handle based on overlapBehavior | ||
try: | ||
worksheet = sheet.worksheet(worksheet_name) | ||
if overlapBehavior == WORKSHEET_OVERRIDE_BEHAVIORS.EXIT: | ||
raise RuntimeError("Worksheet already exists") | ||
except gspread.exceptions.WorksheetNotFound: | ||
worksheet = sheet.add_worksheet( | ||
title=worksheet_name, rows=df.shape[0], cols=df.shape[1] | ||
) | ||
|
||
# Add data to worksheet | ||
worksheet.update([df.columns.values.tolist()] + df.values.tolist()) | ||
|
||
# Format worksheet | ||
# Justify Column Widths | ||
if "column_widths" not in options or options["column_widths"]["justify"]: | ||
text_widths = df.astype(str).columns.map( | ||
lambda column_name: df[column_name].astype(str).str.len().max() | ||
) | ||
header_widths = df.columns.str.len() | ||
buffer_chars = ( | ||
DEFAULT_BUFFER_CHARS | ||
if ("column_widths" not in options or "buffer_chars" not in options["column_widths"]) | ||
else options["column_widths"]["buffer_chars"] | ||
) | ||
column_widths = [ | ||
round((max(len_tuple) + buffer_chars) * FONT_SIZE_PTS * 1/PTS_PIXELS_RATIO) | ||
for len_tuple in zip(text_widths, header_widths) | ||
] | ||
column_positions = [ | ||
gspread.utils.rowcol_to_a1(1, i + 1)[0] for i, _ in enumerate(column_widths) | ||
] | ||
gspread_formatting.set_column_widths(worksheet, zip(column_positions, column_widths)) | ||
# Freeze Header | ||
if "freeze_header" not in options or options["freeze_header"]: | ||
gspread_formatting.set_frozen(worksheet, rows=1) | ||
format_options = gspread_formatting.CellFormat() | ||
# Bold Header | ||
if "bold_header" not in options or options["bold_header"]: | ||
format_options += gspread_formatting.CellFormat(textFormat=gspread_formatting.TextFormat(bold=True)) | ||
# Center Header | ||
if "center_header" not in options or options["center_header"]: | ||
format_options += gspread_formatting.CellFormat(horizontalAlignment="CENTER") | ||
gspread_formatting.format_cell_range( | ||
worksheet, | ||
f"A1:{gspread.utils.rowcol_to_a1(1, len(df.columns))}", | ||
format_options | ||
) | ||
|
||
# Delete Sheet1 if it has been created by default | ||
if "Sheet1" in [i.title for i in sheet.worksheets()]: | ||
sheet.del_worksheet(sheet.worksheet("Sheet1")) | ||
|
||
def fill_spreadsheet_with_df_dict(sheet, df_dict, overlapBehavior, options={}): | ||
""" | ||
Fill a sheet with the contents of a dictionary of DataFrames. | ||
The keys of the dictionary are the names of the worksheets, and the values contain the data to be placed in the sheet. | ||
If any worksheets would be overidden, the behavior is determined by overlapBehavior. | ||
:param sheet: the gspread.Spreadsheet object | ||
:param df_dict: the dictionary of DataFrames to fill the worksheets with | ||
:param overlapBehavior: the behavior to take if any of the worksheets already exist | ||
:param options: the formatting options for the worksheets. | ||
Should be a dictionary with optional elements "bold_header", "center_header", "freeze_header", and "column_widths", optional | ||
""" | ||
if overlapBehavior == WORKSHEET_OVERRIDE_BEHAVIORS.EXIT: | ||
for worksheet_name in df_dict.keys(): | ||
try: | ||
sheet.worksheet(worksheet_name) | ||
raise RuntimeError("Worksheet already exists") | ||
except gspread.exceptions.WorksheetNotFound: | ||
pass | ||
for worksheet_name, df in df_dict.items(): | ||
fill_worksheet_with_df(sheet, df, worksheet_name, overlapBehavior, options=options) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import pandas as pd | ||
from .charts import get_data_df | ||
from .fields import * | ||
from urllib.parse import urlparse | ||
|
||
def get_flat_data_df(analytics_params, metrics, dimensions, remove_matches=None): | ||
""" | ||
Get a df from the Analytics API with a flat structure (no multiindex). | ||
:param analytics_params: the parameters for the Analytics API, including authentication and property ids | ||
:param metrics: the metrics to get | ||
:param dimensions: the dimensions to get | ||
:param remove_matches: a list of regex patterns or None elements to remove from each dimension. | ||
Each regex or None element should correspond with an element of dimensions and remove_matches must be the same length as dimensions. | ||
If the value is None, no patterns are removed, defaults to None. | ||
:return: a DataFrame with the data from the Analytics API | ||
""" | ||
if remove_matches is not None: | ||
assert len(remove_matches) == len(dimensions) | ||
|
||
df = get_data_df( | ||
metrics, | ||
[dimension["id"] for dimension in dimensions], | ||
**analytics_params, | ||
) | ||
if remove_matches is not None: | ||
for i, match in enumerate([dimension["remove_matches"] for dimension in dimensions]): | ||
if match is not None: | ||
df = df.loc[~df.index.get_level_values(i).str.fullmatch(match)] | ||
return df.reset_index().rename(columns=get_rename_dict(dimensions)).copy() | ||
|
||
def get_rename_dict(dimensions): | ||
"""Get a dictionary to rename the columns of a DataFrame.""" | ||
return dict( | ||
zip([dimension["id"] for dimension in dimensions], [dimension["alias"] for dimension in dimensions]) | ||
) | ||
|
||
def get_outbound_sheets_df(analytics_params): | ||
""" | ||
Get a DF with outbound links from the Analytics API. Merges the builtin and custom events for outbound links. | ||
:param analytics_params: the parameters for the Analytics API, including authentication and property ids | ||
:return: a DataFrame with the outbound links from the Analytics API | ||
""" | ||
pd.set_option('future.no_silent_downcasting', True) | ||
# Get the builtin "Click" event | ||
df_builtin_links = get_flat_data_df( | ||
analytics_params, | ||
[METRIC_EVENT_COUNT, METRIC_TOTAL_USERS], | ||
[DIMENSION_PAGE_PATH, DIMENSION_BUILTIN_URL, DIMENSION_EVENT_NAME], | ||
remove_matches=[None, r"\s*", None] | ||
).groupby( | ||
[DIMENSION_PAGE_PATH["alias"], DIMENSION_BUILTIN_URL["alias"]] | ||
).sum().reset_index() | ||
|
||
# Get the custom "outbound_link_click" event | ||
df_custom_links = get_flat_data_df( | ||
analytics_params, | ||
[METRIC_EVENT_COUNT, METRIC_TOTAL_USERS], | ||
[DIMENSION_EVENT_NAME, DIMENSION_CUSTOM_URL, DIMENSION_PAGE_PATH], | ||
remove_matches=[DIMENSION_EVENT_NAME["remove_matches"], r"\(not set\)", None], | ||
).groupby( | ||
[DIMENSION_PAGE_PATH["alias"], DIMENSION_CUSTOM_URL["alias"]] | ||
).sum().reset_index() | ||
# Concatenate the two dataframes, avoiding duplicates | ||
# Keep the link from the builtin event, unless the link contains a #fragment, in which case keep the link from the custom event | ||
df_builtin_links["builtin"] = True | ||
df_builtin_links["truncated_url"] = df_builtin_links[DIMENSION_BUILTIN_URL["alias"]] | ||
df_custom_links["truncated_url"] = df_custom_links[DIMENSION_CUSTOM_URL["alias"]].str.replace(r"#.*", "", regex=True) | ||
df_outbound_links_fragments = df_custom_links.loc[df_custom_links[DIMENSION_CUSTOM_URL["alias"]].str.contains("#")].copy() | ||
df_outbound_links_fragments["is_fragment"] = True | ||
df_all_links = pd.concat( | ||
[df_builtin_links, df_outbound_links_fragments], ignore_index=True | ||
) | ||
# Use the builtin link, unless the link is not in the custom links, in which case use the custom link | ||
df_all_links = df_all_links.loc[ | ||
~(df_all_links["truncated_url"].isin(df_outbound_links_fragments["truncated_url"]) & df_all_links["builtin"]) | ||
].sort_values(METRIC_EVENT_COUNT, ascending=False) | ||
df_all_links["is_fragment"] = df_all_links["is_fragment"].fillna(False).astype(bool) | ||
# Use the builtin link, unless the link is a fragment, in which case use the custom link | ||
df_all_links["complete_url"] = df_all_links["builtin_url"].where( | ||
~df_all_links["is_fragment"], | ||
df_all_links["outbound_url"] | ||
) | ||
df_all_links["hostname"] = df_all_links["complete_url"].map(lambda x: urlparse(x).hostname) | ||
df_all_links = df_all_links.drop( | ||
columns=["builtin_url", "outbound_url", "builtin", "is_fragment"] | ||
).rename( | ||
columns={ | ||
DIMENSION_PAGE_PATH["alias"]: "Page Path", | ||
"complete_url": "Outbound Link", | ||
METRIC_EVENT_COUNT: "Total Clicks", | ||
METRIC_TOTAL_USERS: "Total Users", | ||
"hostname": "Hostname", | ||
} | ||
)[["Page Path", "Hostname", "Outbound Link", "Total Clicks", "Total Users"]] | ||
return df_all_links.copy().reset_index(drop=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.