Skip to content

Commit

Permalink
skip placeholder for service names and match catalog with servicecatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
gruebel committed Apr 1, 2024
1 parent 21be907 commit 8ed4857
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 1 deletion.
7 changes: 6 additions & 1 deletion policy_sentry/shared/iam_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ def get_service_prefix_data(service_prefix: str) -> dict[str, Any]:
List: A list of metadata about that service
"""
try:
return cast("dict[str, Any]", iam_definition.get(service_prefix, {}))
return cast("dict[str, Any]", iam_definition[service_prefix])
# pylint: disable=bare-except, inconsistent-return-statements
except:
if service_prefix == "catalog":
# the resource types "Portfolio" and "Product" have the service name "catalog" in their ARN
# https://docs.aws.amazon.com/service-authorization/latest/reference/list_awsservicecatalog.html#awsservicecatalog-resources-for-iam-policies
return cast("dict[str, Any]", iam_definition["servicecatalog"])

logger.info(f"Service prefix not {service_prefix} found.")
return {}
6 changes: 6 additions & 0 deletions policy_sentry/writing/sid_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ def add_by_arn_and_access_level(
"""
for arn in arn_list:
service_prefix = get_service_from_arn(arn)
if "$" in service_prefix:
logger.debug(
f"Not supported service {service_prefix} found in ARN {arn}"
)
continue

service_action_data = get_action_data(service_prefix, "*")
for service_prefix, action_data in service_action_data.items():
for row in action_data:
Expand Down
10 changes: 10 additions & 0 deletions test/writing/test_sid_group_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,16 @@ def test_resource_restriction_plus_dependent_action_simple_2(self):
# print(json.dumps(output, indent=4))
self.assertDictEqual(output, desired_output)

def test_add_by_arn_and_access_level_ignores_placeholder_services(self):
# https://github.com/salesforce/policy_sentry/issues/448

sid_group = SidGroup()
sid_group.add_by_arn_and_access_level(
["arn:${Partition}:${Vendor}:${Region}:*:${ResourceType}:${RecoveryPointId}"], "Read"
)

self.assertTrue(not sid_group.sids)

def test_add_by_list_of_actions(self):
actions_test_data_1 = ["kms:CreateCustomKeyStore", "kms:CreateGrant"]
sid_group = SidGroup()
Expand Down

0 comments on commit 8ed4857

Please sign in to comment.