Skip to content

Commit

Permalink
Merge pull request #106 from atlanhq/AM-320
Browse files Browse the repository at this point in the history
Enhancements
  • Loading branch information
ErnestoLoma authored Aug 31, 2023
2 parents 2069bfd + 40b47ff commit 1bedfb5
Show file tree
Hide file tree
Showing 10 changed files with 484 additions and 161 deletions.
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 0.6.3 (August 31, 2023)
* Add ability to get, create and modify API tokens

## 0.6.2 (August 24, 2023)
* Fix bug in generating field class vars

Expand Down
244 changes: 237 additions & 7 deletions pyatlan/client/atlan.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
CREATE_GROUP,
CREATE_TYPE_DEFS,
CREATE_USERS,
DELETE_API_TOKEN,
DELETE_ENTITIES_BY_GUIDS,
DELETE_ENTITY_BY_ATTRIBUTE,
DELETE_GROUP,
DELETE_TYPE_DEF_BY_NAME,
GET_ALL_TYPE_DEFS,
GET_API_TOKENS,
GET_CURRENT_USER,
GET_ENTITY_BY_GUID,
GET_ENTITY_BY_UNIQUE_ATTRIBUTE,
Expand All @@ -62,9 +64,11 @@
UPDATE_TYPE_DEFS,
UPDATE_USER,
UPLOAD_IMAGE,
UPSERT_API_TOKEN,
)
from pyatlan.error import AtlanError, NotFoundError
from pyatlan.exceptions import AtlanServiceException, InvalidRequestException
from pyatlan.model.api_tokens import ApiToken, ApiTokenRequest, ApiTokenResponse
from pyatlan.model.assets import (
Asset,
AtlasGlossary,
Expand Down Expand Up @@ -671,15 +675,17 @@ def get_groups(
raw_json = self._call_api(GET_GROUPS.format_path_with_params(), query_params)
return GroupResponse(**raw_json)

def get_all_groups(self) -> list[AtlanGroup]:
def get_all_groups(
self,
limit: int = 20,
) -> list[AtlanGroup]:
"""
Retrieve all groups defined in Atlan.
:returns: a list of all the groups in Atlan
"""
groups: list[AtlanGroup] = []
offset = 0
limit = 100
response: Optional[GroupResponse] = self.get_groups(
offset=offset, limit=limit, sort="createdAt"
)
Expand All @@ -693,7 +699,9 @@ def get_all_groups(self) -> list[AtlanGroup]:
return groups

def get_group_by_name(
self, alias: str, limit: int = 100
self,
alias: str,
limit: int = 20,
) -> Optional[list[AtlanGroup]]:
"""
Retrieve all groups with a name that contains the provided string.
Expand Down Expand Up @@ -876,7 +884,7 @@ def get_users(
:param post_filter: which users to retrieve
:param sort: property by which to sort the results
:param count: whether to return the total number of records (True) or not (False)
:param offset: starting ponit for results to return, for paging
:param offset: starting point for results to return, for paging
:returns: a list of users that match the provided criteria
"""
query_params: dict[str, str] = {
Expand All @@ -892,15 +900,17 @@ def get_users(
raw_json = self._call_api(GET_USERS.format_path_with_params(), query_params)
return UserResponse(**raw_json)

def get_all_users(self) -> list[AtlanUser]:
def get_all_users(
self,
limit: int = 20,
) -> list[AtlanUser]:
"""
Retrieve all users defined in Atlan.
:returns: a list of all the users in Atlan
"""
users: list[AtlanUser] = []
offset = 0
limit = 100
response: Optional[UserResponse] = self.get_users(
offset=offset, limit=limit, sort="username"
)
Expand All @@ -914,7 +924,9 @@ def get_all_users(self) -> list[AtlanUser]:
return users

def get_users_by_email(
self, email: str, limit: int = 100
self,
email: str,
limit: int = 20,
) -> Optional[list[AtlanUser]]:
"""
Retrieves all users with email addresses that contain the provided email.
Expand Down Expand Up @@ -1911,6 +1923,224 @@ def get_lineage_list(
assets=assets,
)

def add_api_token_as_admin(
self, asset_guid: str, impersonation_token: str
) -> AssetMutationResponse:
"""
Add the API token configured for the default client as an admin to the asset with the provided GUID.
This is primarily useful for connections, to allow the API token to manage policies for the connection, and
for query collections, to allow the API token to manage the queries in a collection or the collection itself.
:param asset_guid: unique identifier (GUID) of the asset to which we should add this API token as an admin
:param impersonation_token: a bearer token for an actual user who is already an admin for the asset,
NOT an API token
"""
from pyatlan.model.assets.asset00 import Asset
from pyatlan.model.fluent_search import FluentSearch

token_user = str(self.get_current_user().username)
existing_token = self.api_key
self.api_key = impersonation_token

# Look for the asset as the impersonated user, ensuring we include the admin users
# in the results (so we avoid clobbering any existing admin users)
request = (
FluentSearch()
.where(Asset.GUID.eq(asset_guid))
.include_on_results(Asset.ADMIN_USERS)
.page_size(1)
).to_request()
results = self.search(request)
if results.current_page():
asset = results.current_page()[0]
existing_admins = asset.admin_users or set()
existing_admins.add(token_user)
to_update = asset.trim_to_required()
to_update.admin_users = existing_admins
response = self.save(to_update)
else:
self.api_key = existing_token
raise NotFoundError(
message=f"Asset with GUID {asset_guid} does not exist.",
code="ATLAN-PYTHON-404-001",
)

self.api_key = existing_token

return response

def add_api_token_as_viewer(
self, asset_guid: str, impersonation_token: str
) -> AssetMutationResponse:
"""
Add the API token configured for the default client as a viewer to the asset with the provided GUID.
This is primarily useful for query collections, to allow the API token to view or run queries within the
collection, but not make any changes to them.
:param asset_guid: unique identifier (GUID) of the asset to which we should add this API token as an admin
:param impersonation_token: a bearer token for an actual user who is already an admin for the asset,
NOT an API token
"""
from pyatlan.model.assets.asset00 import Asset
from pyatlan.model.fluent_search import FluentSearch

token_user = str(self.get_current_user().username)
existing_token = self.api_key
self.api_key = impersonation_token

# Look for the asset as the impersonated user, ensuring we include the admin users
# in the results (so we avoid clobbering any existing admin users)
request = (
FluentSearch()
.where(Asset.GUID.eq(asset_guid))
.include_on_results(Asset.VIEWER_USERS)
.page_size(1)
).to_request()
results = self.search(request)
if results.current_page():
asset = results.current_page()[0]
existing_viewers = asset.viewer_users or set()
existing_viewers.add(token_user)
to_update = asset.trim_to_required()
to_update.viewer_users = existing_viewers
response = self.save(to_update)
else:
self.api_key = existing_token
raise NotFoundError(
message=f"Asset with GUID {asset_guid} does not exist.",
code="ATLAN-PYTHON-404-001",
)

self.api_key = existing_token

return response

def get_api_tokens(
self,
limit: Optional[int] = None,
post_filter: Optional[str] = None,
sort: Optional[str] = None,
count: bool = True,
offset: int = 0,
) -> ApiTokenResponse:
"""
Retrieves a list of API tokens defined in Atlan.
:param limit: maximum number of results to be returned
:param post_filter: which API tokens to retrieve
:param sort: property by which to sort the results
:param count: whether to return the total number of records (True) or not (False)
:param offset: starting point for results to return, for paging
:returns: a list of API tokens that match the provided criteria
"""
query_params: dict[str, str] = {
"count": str(count),
"offset": str(offset),
}
if limit is not None:
query_params["limit"] = str(limit)
if post_filter is not None:
query_params["filter"] = post_filter
if sort is not None:
query_params["sort"] = sort
raw_json = self._call_api(
GET_API_TOKENS.format_path_with_params(), query_params
)
return ApiTokenResponse(**raw_json)

def get_api_token_by_name(self, display_name: str) -> Optional[ApiToken]:
"""
Retrieves the API token with a name that exactly matches the provided string.
:param display_name: name (as it appears in the UI) by which to retrieve the API token
:returns: the API token whose name (in the UI) matches the provided string, or None if there is none
"""
if response := self.get_api_tokens(
offset=0,
limit=5,
post_filter='{"displayName":"' + display_name + '"}',
):
if response.records and len(response.records) >= 1:
return response.records[0]
return None

def get_api_token_by_id(self, client_id: str) -> Optional[ApiToken]:
"""
Retrieves the API token with a client ID that exactly matches the provided string.
:param client_id: unique client identifier by which to retrieve the API token
:returns: the API token whose clientId matches the provided string, or None if there is none
"""
if response := self.get_api_tokens(
offset=0,
limit=5,
post_filter='{"clientId":"' + client_id + '"}',
):
if response.records and len(response.records) >= 1:
return response.records[0]
return None

def create_api_token(
self,
display_name: str,
description: str = "",
personas: Optional[set[str]] = None,
validity_seconds: int = -1,
) -> ApiToken:
"""
Create a new API token with the provided settings.
:param display_name: human-readable name for the API token
:param description: optional explanation of the API token
:param personas: unique identifiers (GUIDs) of personas that should be linked to the token
:param validity_seconds: time in seconds after which the token should expire (negative numbers are treated as
infinite)
:returns: the created API token
"""
request = ApiTokenRequest(
display_name=display_name,
description=description,
personas=personas,
validity_seconds=validity_seconds,
)
raw_json = self._call_api(UPSERT_API_TOKEN, request_obj=request)
return ApiToken(**raw_json)

def update_api_token(
self,
guid: str,
display_name: str,
description: str = "",
personas: Optional[set[str]] = None,
) -> ApiToken:
"""
Update an existing API token with the provided settings.
:param guid: unique identifier (GUID) of the API token
:param display_name: human-readable name for the API token
:param description: optional explanation of the API token
:param personas: unique identifiers (GUIDs) of personas that should be linked to the token
:returns: the created API token
"""
request = ApiTokenRequest(
display_name=display_name,
description=description,
personas=personas,
validity_seconds=None,
)
raw_json = self._call_api(
UPSERT_API_TOKEN.format_path_with_params(guid), request_obj=request
)
return ApiToken(**raw_json)

def purge_api_token(self, guid: str) -> None:
"""
Delete (purge) the specified API token.
:param guid: unique identifier (GUID) of the API token to delete
"""
self._call_api(DELETE_API_TOKEN.format_path_with_params(guid))

def get_keycloak_events(
self, keycloak_request: KeycloakEventRequest
) -> KeycloakEventResponse:
Expand Down
6 changes: 6 additions & 0 deletions pyatlan/client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
QUERY_API = f"{SQL_URI}query"
IMAGE_API = f"{ADMIN_URI}images"
LOGS_API = f"{ADMIN_URI}events"
TOKENS_API = f"{ADMIN_URI}apikeys"

# Role APIs
GET_ROLES = API(ROLE_API, HTTPMethod.GET, HTTPStatus.OK)
Expand Down Expand Up @@ -59,6 +60,11 @@
KEYCLOAK_EVENTS = API(f"{LOGS_API}/login", HTTPMethod.GET, HTTPStatus.OK)
ADMIN_EVENTS = API(f"{LOGS_API}/main", HTTPMethod.GET, HTTPStatus.OK)

# API token APIs
GET_API_TOKENS = API(TOKENS_API, HTTPMethod.GET, HTTPStatus.OK)
UPSERT_API_TOKEN = API(TOKENS_API, HTTPMethod.POST, HTTPStatus.OK)
DELETE_API_TOKEN = API(TOKENS_API, HTTPMethod.DELETE, HTTPStatus.OK)

ENTITY_API = f"{BASE_URI}entity/"
PREFIX_ATTR = "attr:"
PREFIX_ATTR_ = "attr_"
Expand Down
12 changes: 9 additions & 3 deletions pyatlan/generator/templates/methods/asset/connection.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@
)
if admin_users:
from pyatlan.cache.user_cache import UserCache
from pyatlan.client.atlan import AtlanClient
for username in admin_users:
if not UserCache.get_id_for_name(username):
raise ValueError(
f"Provided username {username} was not found in Atlan."
)
# If we cannot find the username, fallback to looking for an API token
client = AtlanClient.get_default_client()
if client is None:
client = AtlanClient()
if not client.get_api_token_by_id(username):
raise ValueError(
f"Provided username {username} was not found in Atlan."
)
attr = cls.Attributes(
name=name,
qualified_name=connector_type.to_qualified_name(),
Expand Down
Loading

0 comments on commit 1bedfb5

Please sign in to comment.