-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlambda-security-headers.py
executable file
·150 lines (129 loc) · 4.68 KB
/
lambda-security-headers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
""" Ensure that the security headers Lambda is connected to the CF Distribution """
#!/usr/bin/python3
import sys
import os
import boto3
_PROFILE = (
"AWS_STATIC_SITE_PROFILE"
)
_CFDIST = (
"CF_DIST_ID_STATIC_LO"
)
_LAMBDA_ARN = (
"CLOUDFRONT_ADD_SECURITY_HEADERS_ARN"
)
_LAMBDA_FUNC = (
"CLOUDFRONT_SECURITY_HEADERS_FUNC"
)
def get_env_var(env):
""" Return the value for the required environment variable or exit with error """
env_value = os.environ.get(env)
if env_value is None:
sys.exit("Cannot retrieve environment variable '%s'" % env)
return env_value
# def lambda_list_functions():
# profile = get_env_var(_PROFILE)
# session = boto3.Session(profile_name=profile)
# client = session.client('lambda', 'us-east-1')
# return client.list_functions()["Functions"]
# def get_function():
# functions = lambda_list_functions()
# for f in functions:
# if f["FunctionName"] == "cloudfront-add-security-headers":
# return f
# # Failed to find the Lambda function
# sys.exit(1)
def build_lambda_arn(function_name):
""" Get the latest version for the function """
profile = get_env_var(_PROFILE)
session = boto3.Session(profile_name=profile)
client = session.client('lambda', 'us-east-1')
versions = client.list_versions_by_function(FunctionName=function_name)["Versions"]
# Find the highest version number
highest_ver = None
highest_arn = None
for version in versions:
if version["Version"] != "$LATEST":
ver = int(version["Version"])
if highest_ver is None or ver > highest_ver:
highest_ver = ver
highest_arn = version["FunctionArn"]
return highest_arn
def get_lambda_arn():
""" Return the ARN for the Lambda function """
function_name = os.environ.get(_LAMBDA_FUNC)
if function_name is not None:
return build_lambda_arn(function_name)
return get_env_var(_LAMBDA_ARN)
def is_function_attached():
""" Is the function attached to the distribution? """
func = get_lambda_arn()
profile = get_env_var(_PROFILE)
session = boto3.Session(profile_name=profile)
cf_client = session.client('cloudfront', 'us-east-1')
config = cf_client.get_distribution_config(
Id=get_env_var(_CFDIST)
)
distrib_config = config["DistributionConfig"]
dcb = distrib_config["DefaultCacheBehavior"]
if ("LambdaFunctionAssociations" not in dcb or
"Items" not in dcb["LambdaFunctionAssociations"]):
return False
for lfa in dcb["LambdaFunctionAssociations"]["Items"]:
if lfa["LambdaFunctionARN"] == func:
return True
return False
def attach_function():
""" Attach or update the function connected to the distribution """
# Get the function ARN without the version number in it
func = get_lambda_arn()
no_ver = func.rsplit(":", 1)[0]
item_block = {
"EventType": "origin-response",
"LambdaFunctionARN": func
}
profile = get_env_var(_PROFILE)
session = boto3.Session(profile_name=profile)
cf_client = session.client('cloudfront', 'us-east-1')
config = cf_client.get_distribution_config(
Id=get_env_var(_CFDIST)
)
distrib_config = config["DistributionConfig"]
dcb = distrib_config["DefaultCacheBehavior"]
if "LambdaFunctionAssociations" not in dcb:
dcb["LambdaFunctionAssociations"] = {
"Items": [
item_block
],
"Quantity": 1
}
print("Initialising LFA block to add security headers function")
else:
found = False
if "Items" not in dcb["LambdaFunctionAssociations"]:
dcb["LambdaFunctionAssociations"]["Items"] = []
else:
for lfa in dcb["LambdaFunctionAssociations"]["Items"]:
if lfa["LambdaFunctionARN"].startswith(no_ver):
lfa["LambdaFunctionARN"] = func
found = True
print("Updating existing security headers function")
break
if not found:
lfa_block = dcb["LambdaFunctionAssociations"]
lfa_block["Items"].append(item_block)
lfa_block["Quantity"] = lfa_block["Quantity"] + 1
print(
"Adding security headers function, total of %s" %
lfa_block["Quantity"])
cf_client.update_distribution(
DistributionConfig=distrib_config,
Id=get_env_var(_CFDIST),
IfMatch=config["ETag"]
)
print("CloudFront distribution updated")
if __name__ == '__main__':
if is_function_attached():
print("Security headers function is already attached")
else:
attach_function()