diff --git a/policy_sentry/shared/iam_data.py b/policy_sentry/shared/iam_data.py index 7a0ddc15..0676eaaa 100644 --- a/policy_sentry/shared/iam_data.py +++ b/policy_sentry/shared/iam_data.py @@ -45,8 +45,13 @@ def get_service_prefix_data(service_prefix: str) -> dict[str, Any]: List: A list of metadata about that service """ try: - return cast("dict[str, Any]", iam_definition.get(service_prefix, {})) + return cast("dict[str, Any]", iam_definition[service_prefix]) # pylint: disable=bare-except, inconsistent-return-statements except: + if service_prefix == "catalog": + # the resource types "Portfolio" and "Product" have the service name "catalog" in their ARN + # https://docs.aws.amazon.com/service-authorization/latest/reference/list_awsservicecatalog.html#awsservicecatalog-resources-for-iam-policies + return cast("dict[str, Any]", iam_definition["servicecatalog"]) + logger.info(f"Service prefix not {service_prefix} found.") return {} diff --git a/policy_sentry/writing/sid_group.py b/policy_sentry/writing/sid_group.py index 7c56e84e..ed078c37 100644 --- a/policy_sentry/writing/sid_group.py +++ b/policy_sentry/writing/sid_group.py @@ -320,6 +320,12 @@ def add_by_arn_and_access_level( """ for arn in arn_list: service_prefix = get_service_from_arn(arn) + if "$" in service_prefix: + logger.debug( + f"Not supported service {service_prefix} found in ARN {arn}" + ) + continue + service_action_data = get_action_data(service_prefix, "*") for service_prefix, action_data in service_action_data.items(): for row in action_data: diff --git a/test/writing/test_sid_group_crud.py b/test/writing/test_sid_group_crud.py index a6759b59..bf085fb8 100644 --- a/test/writing/test_sid_group_crud.py +++ b/test/writing/test_sid_group_crud.py @@ -297,6 +297,16 @@ def test_resource_restriction_plus_dependent_action_simple_2(self): # print(json.dumps(output, indent=4)) self.assertDictEqual(output, desired_output) + def test_add_by_arn_and_access_level_ignores_placeholder_services(self): + # https://github.com/salesforce/policy_sentry/issues/448 + + sid_group = SidGroup() + sid_group.add_by_arn_and_access_level( + ["arn:${Partition}:${Vendor}:${Region}:*:${ResourceType}:${RecoveryPointId}"], "Read" + ) + + self.assertTrue(not sid_group.sids) + def test_add_by_list_of_actions(self): actions_test_data_1 = ["kms:CreateCustomKeyStore", "kms:CreateGrant"] sid_group = SidGroup()