Skip to content

Commit

Permalink
Added validators to Asset Request and Bulk request to flush the custo…
Browse files Browse the repository at this point in the history
…m metadat of the assets
  • Loading branch information
ErnestoLoma committed Jun 5, 2023
1 parent bf62307 commit b2d4c1e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
3 changes: 3 additions & 0 deletions pyatlan/model/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ def get_custom_metadata(self, name: str) -> CustomMetadataDict:
def set_custom_metadata(self, custom_metadata: CustomMetadataDict):
return self._metadata_proxy.set_custom_metadata(custom_metadata=custom_metadata)

def flush_custom_metadata(self):
self.business_attributes = self._metadata_proxy.business_attributes


class Asset(Referenceable):
"""Description"""
Expand Down
18 changes: 17 additions & 1 deletion pyatlan/model/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright 2022 Atlan Pte. Ltd.
from typing import TYPE_CHECKING

from pydantic import BaseModel, Extra, Field
from pydantic import BaseModel, Extra, Field, validator

if TYPE_CHECKING:
from dataclasses import dataclass
Expand Down Expand Up @@ -184,6 +184,22 @@ class AssetResponse(AtlanObject, GenericModel, Generic[T]):
class AssetRequest(AtlanObject, GenericModel, Generic[T]):
entity: T

@validator("entity")
def flush_custom_metadata(cls, v):
from pyatlan.model.assets import Asset

if isinstance(v, Asset):
v.flush_custom_metadata()
return v


class BulkRequest(AtlanObject, GenericModel, Generic[T]):
entities: list[T]

@validator("entities", each_item=True)
def flush_custom_metadata(cls, v):
from pyatlan.model.assets import Asset

if isinstance(v, Asset):
v.flush_custom_metadata()
return v
14 changes: 7 additions & 7 deletions tests/integration/custom_metadata_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def test_add_term_cm_raci(
groups: List[AtlanGroup],
):
cm_name = make_unique("RACI")
raci_attrs = term.get_custom_metadata(cm_name)
raci_attrs = CustomMetadataDict(cm_name)
_validate_raci_empty(raci_attrs)
group1, group2 = _get_groups(client, make_unique)
raci_attrs[CM_ATTR_RACI_RESPONSIBLE] = [FIXED_USER]
Expand All @@ -424,7 +424,7 @@ def test_add_term_cm_ipr(
term: AtlasGlossaryTerm,
):
cm_name = make_unique("IPR")
ipr_attrs = term.get_custom_metadata(cm_name)
ipr_attrs = CustomMetadataDict(cm_name)
_validate_ipr_empty(ipr_attrs)
ipr_attrs[CM_ATTR_IPR_LICENSE] = "CC BY"
ipr_attrs[CM_ATTR_IPR_VERSION] = 2.0
Expand All @@ -445,7 +445,7 @@ def test_add_term_cm_dq(
term: AtlasGlossaryTerm,
):
cm_name = make_unique("DQ")
dq_attrs = term.get_custom_metadata(cm_name)
dq_attrs = CustomMetadataDict(cm_name)
_validate_dq_empty(dq_attrs)
dq_attrs[CM_ATTR_QUALITY_COUNT] = 42
dq_attrs[CM_ATTR_QUALITY_SQL] = "SELECT * from SOMEWHERE;"
Expand All @@ -464,7 +464,7 @@ def test_update_term_cm_ipr(
term: AtlasGlossaryTerm,
):
cm_name = make_unique("IPR")
ipr = term.get_custom_metadata(cm_name)
ipr = CustomMetadataDict(cm_name)
# Note: MUST access the getter / setter, not the underlying store
ipr[CM_ATTR_IPR_MANDATORY] = False
client.update_custom_metadata_attributes(term.guid, ipr)
Expand All @@ -487,7 +487,7 @@ def test_replace_term_cm_raci(
CM_RACI = make_unique("RACI")
CM_IPR = make_unique("IPR")
CM_QUALITY = make_unique("DQ")
raci = term.get_custom_metadata(CM_RACI)
raci = CustomMetadataDict(CM_RACI)
group1, group2 = _get_groups(client, make_unique)
raci[CM_ATTR_RACI_RESPONSIBLE] = [FIXED_USER]
raci[CM_ATTR_RACI_ACCOUNTABLE] = FIXED_USER
Expand All @@ -513,7 +513,7 @@ def test_replace_term_cm_ipr(
CM_RACI = make_unique("RACI")
CM_IPR = make_unique("IPR")
CM_QUALITY = make_unique("DQ")
term_cm_ipr = term.get_custom_metadata(CM_IPR)
term_cm_ipr = CustomMetadataDict(CM_IPR)
client.replace_custom_metadata(term.guid, term_cm_ipr)
t = client.retrieve_minimal(guid=term.guid, asset_type=AtlasGlossaryTerm)
assert t
Expand Down Expand Up @@ -800,7 +800,7 @@ def test_update_replacing_cm(
CM_RACI = make_unique("RACI")
CM_IPR = make_unique("IPR")
CM_QUALITY = make_unique("DQ")
raci = term.get_custom_metadata(CM_RACI)
raci = CustomMetadataDict(CM_RACI)
group1, group2 = _get_groups(client, make_unique)
raci[CM_ATTR_RACI_RESPONSIBLE] = [FIXED_USER]
raci[CM_ATTR_RACI_ACCOUNTABLE] = FIXED_USER
Expand Down

0 comments on commit b2d4c1e

Please sign in to comment.