forked from hashicorp/terraform-sentinel-policies
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaws-functions.sentinel
345 lines (306 loc) · 13.9 KB
/
aws-functions.sentinel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# Common functions for use with the AWS provider
##### Imports #####
import "tfconfig-functions" as config
import "tfplan/v2" as tfplan
import "tfconfig/v2" as tfconfig
import "strings"
import "types"
##### Functions #####
### find_resources_with_standard_tags ###
find_resources_with_standard_tags = func(resource_types) {
resources = filter tfplan.resource_changes as address, rc {
rc.provider_name matches "(.*)aws$" and
rc.type in resource_types and
rc.mode is "managed" and
(rc.change.actions contains "create" or rc.change.actions contains "update" or
rc.change.actions contains "read" or rc.change.actions contains "no-op")
}
return resources
}
### determine_role_arn ###
# This can only determine the role_arn if it is set to either a hard-coded value
# or to a reference to a single Terraform variable.
# It sets the role to "complex" if it finds a single non-variable reference
# or if it finds multiple references.
# It sets the role to "none" if no role arn is found.
determine_role_arn = func(address, data) {
# Return empty string if provider does not assume a role
role_arn_value = "none"
# Check for role_arn
if (length(data.config) else 0) > 0 and
(length(data.config.assume_role) else 0) > 0 and
data.config.assume_role[0].role_arn else null is not null {
role_arn_data = data.config.assume_role[0].role_arn
# Check for constant value or references
if role_arn_data.constant_value else null is not null {
# role_arn of AWS provider was hard-coded role_arn
#print("Found a single constant value for role_arn.")
role_arn_value = role_arn_data.constant_value
} else if role_arn_data.references else [] is not [] {
if length(role_arn_data.references) == 1 {
# Process references
role_arn = role_arn_data.references[0]
if role_arn matches "var\\.(.*)" {
# role_arn of AWS provider was a variable
#print("Found a single variable reference for role_arn.")
role_arn_variable = strings.trim_prefix(role_arn, "var.")
role_arn_value = tfplan.variables[role_arn_variable].value
} else {
# reference was not a variable
print("Found a single reference in the role_arn attribute,",
"for provider", address, "but it was not a variable.")
print("This policy only handles a role_arn attribute that is",
"a constant value or a single reference to a variable.")
# Set role_arn_value to null to cause failure of policy
role_arn_value = "complex"
} // end if role_arn is variable
} else {
print("Found more than one reference in the role_arn attribute",
"for provider", address)
print("This policy only handles a role_arn attribute that is",
"a constant value or a single reference to a variable.")
# Set role_arn_value to null to cause failure of policy
role_arn_value = "complex"
} // end if single reference
} // end if constant_value or references
} else {
#print("Did not find role_arn.")
} // end if assume_role.role_arn in config
return role_arn_value
}
### get_assumed_roles ###
# Get assumed roles from all AWS providers
# Please note that the assumed roles returned could include "none" or "complex";
# See the comments for the determine_role_arn function above.
get_assumed_roles = func() {
# Initialize empty map of roles indexed by aliases
assumed_roles = {}
# Get all AWS provider aliases
aws_providers = config.find_providers_by_type("aws")
# Iterate through all AWS provider aliases
for aws_providers as address, data {
assumed_roles[address] = determine_role_arn(address, data)
} // end aws_providers
return assumed_roles
}
### validate_assumed_roles_with_list ###
# Validate that all assumed roles are allowed.
# If you want to the policy to pass if an assumed role contains a single
# non-variable reference or if it finds multiple references, then include a role
# called "complex" in the allowed_roles list.
validate_assumed_roles_with_list = func(allowed_roles) {
validated = true
assumed_roles = get_assumed_roles()
for assumed_roles as address, role {
if role is not "none" and role not in allowed_roles {
print("AWS provider", address, "has assumed role",
role, "that is not allowed.")
validated = false
}
}
return validated
}
### validate_assumed_roles_with_map ###
# Validate that all assumed roles are allowed for the current workspace.
# If you want to a policy to pass if an assumed role contains a single
# non-variable reference or if it finds multiple references, then include a role
# called "complex" in the map passed to this function and associate it
# with workspaces.
validate_assumed_roles_with_map = func(roles_map, workspace_name) {
validated = true
assumed_roles = get_assumed_roles()
for assumed_roles as address, role {
if role is not "none" {
if role not in keys(roles_map) {
print("AWS provider", address, "has assumed role",
role, "that is not allowed.")
validated = false
} else {
# Validate that role is allowed for current workspace
matched = false
for roles_map[role] as workspace_regex {
if workspace_name matches workspace_regex {
matched = true
}
} // end for workspace_regex
if not matched {
print("Workspace", workspace_name, "is not allowed to use role", role)
print("It used that role in the AWS provider", address)
validated = false
} // end matched check
} // end role in roles_map
} // end if role is not ""
} // end for assumed_roles
return validated
}
### filter_providers_by_regions ###
# Filter instances of the AWS provider to those in a specific region using the
# tfconfig/v2 and tfplan/v2 imports.
# See the comments on the validate_provider_in_allowed_regions() function below
# for details on how this is done.
# The parameter, aws_providers, should be a list of AWS provider aliases
# derived from tfconfig.providers.
# The parameter, allowed_regions, should be given as a list of AWS regions
# such as `["us-east-1" and "eu-west-2"]`.
filter_providers_by_regions = func(aws_providers, allowed_regions) {
# Initialize empty list of validated AWS provider instances
validated_providers = {}
# Process AWS providers
for aws_providers as index, p {
if "config" in keys(p) and "region" in keys(p.config) {
# provider alias has a region
validated = validate_provider_in_allowed_regions(p, allowed_regions)
if validated {
print("validated provider:", index)
validated_providers[index] = p
}
} else {
# provider alias does not have region
# So, we process it as an alias to a root module provider
# A provider that does not have its own region should look like
# <module_address>:<provider>.<alias>
# we want to look at <provider>.<alias>
p_segments = strings.split(p.provider_config_key, ":")
if length(p_segments) == 1 {
# Current provider is already in root module
# So, it is not an alias to another provider
# Continue on to next resource in for loop
continue
}
# Get the provider in root module that current provider aliases
p_alias_name = p_segments[1]
p_alias = tfconfig.providers[p_alias_name] else null
# Process p_alias
if p_alias is not null {
validated =
validate_provider_in_allowed_regions(p_alias, allowed_regions)
if validated {
print("validated provider:", index)
validated_providers[index] = p
}
} // end p_alias not null
} // end else no region
} // end for
# return validated providers
return validated_providers
}
### validate_provider_in_allowed_regions ###
# Validate if a specific provider instance is in a list of regions using the # tfconfig/v2 and tfplan/v2 iimports.
# The parameter, p, should be a provider derived from tfconfig.providers
# or from provider_config_key of a resource from tfconfig.resources.
# The parameter, regions, should be given as a list of allowed regions
# It attempts to identify the region of the provider aliases in several ways
# including constant values assigned to their `region` argument and resolution
# of references to variables. It first tries to process references to variables
# as strings, then as maps with a key called "region". It handles references
# to variables in the root module by using tfplan.variables. It handles references
# to variables in non-root modules by examining the module call from the current
# module's parent.
# It even tries to match provider aliases in proxy configuration blocks
# (which do not specify regions) of child modules to similarly-named provider
# aliases in the root module.
# If the alias passed in the module call does not match the alias in the root
# module, Sentinel has no way of linking the two provider aliases. However,
# since all providers that do specify regions will be restricted and since
# provider alias proxies must point to other provider aliases in ancestor modules,
# all provider aliases should be restricted by this policy.
validate_provider_in_allowed_regions = func(p, regions) {
if "config" in keys(p) and "region" in keys(p.config) {
# provider has its own region
if "constant_value" in keys(p.config.region) {
if p.config.region.constant_value in regions {
# Found resource from region
return true
} else {
return false
}
} else if "references" in keys(p.config.region) {
# Iterate over references in region
# Usually, there should only be one reference
for p.config.region.references as reference {
# Cross reference variables
# Note that this function does not handle other types of references
if strings.has_prefix(reference, "var.") {
variable_name = strings.split(reference, ".")[1]
if p.module_address is "" {
# Get root module variable from tfplan.variables
variable_value = tfplan.variables[variable_name].value
# Check type of variable
variable_type = types.type_of(variable_value)
if variable_type is "string" and variable_value in regions {
# Found resource from region
return true
} else if variable_type is "map" and
"region" in keys(variable_value) and
variable_value["region"] in regions {
# Found resource from region
return true
} // end variable_type check
} else {
# Get non-root module variable
# Could be value passed into variable by module call
# or be the default value of a variable in the module
# First find parent module
module_segments = strings.split(p.module_address, ".")
num_segments = length(module_segments)
parent_module = strings.join(module_segments[0:num_segments-2], ".")
current_module_name = module_segments[num_segments-1]
# Find module call that called current module
if parent_module is "" {
# parent module is root module
mc = tfconfig.module_calls[current_module_name]
} else {
# parent module is not root module
mc = tfconfig.module_calls[parent_module + ":" + current_module_name]
}
# Check if referenced variable was passed in the module call
if variable_name in keys(mc.config) {
mc_var = mc.config[variable_name]
if "constant_value" in keys(mc_var) {
if mc_var.constant_value in regions {
# Found resource from region
return true
} else {
return false
}
} else if "references" in keys(mc_var) and parent_module is "" {
// Check root module variable references
for mc_var.references as mc_reference {
if strings.has_prefix(mc_reference, "var.") {
mc_variable_name = strings.split(mc_reference, ".")[1]
mc_variable_value = tfplan.variables[mc_variable_name].value
mc_variable_type = types.type_of(mc_variable_value)
if mc_variable_type is "string" and mc_variable_value in regions {
# Found resource from region
return true
} else if mc_variable_type is "map" and
"region" in keys(mc_variable_value) and
mc_variable_value["region"] in regions {
# Found resource from region
return true
} // end mc_variable_type check
} // end if mc_reference is a root module variable
} // end for mc_var.references
} // end constant_value or references in module call
} else {
# check default value of variable in current module
default_value = tfconfig.variables[variable_name].default
default_value_type = types.type_of(default_value)
if default_value_type is "string" and default_value in regions {
# Found resource from region
return true
} else if default_value_type is "map" and
"region" in keys(default_value) and
default_value["region"] in regions {
# Found resource from region
return true
}
} // end if matched variable in module config or default value
} // end else non-root module
} // end reference is a variable
} // end for references
} // end references
} // end if p has config.region
# If we did not already return true, then return false
return false
}