forked from aws-cloudformation/cfn-lint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathInstanceSize.py
131 lines (118 loc) · 5.49 KB
/
InstanceSize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import cfnlint.helpers
from cfnlint.rules import CloudFormationLintRule
from cfnlint.rules import RuleMatch
from cfnlint.data import AdditionalSpecs
class InstanceSize(CloudFormationLintRule):
"""Check if Resources RDS Instance Size is compatible with the RDS type"""
id = 'E3025'
shortdesc = 'RDS instance type is compatible with the RDS type'
description = (
'Check the RDS instance types are supported by the type of RDS engine. '
'Only if the values are strings will this be checked.'
)
source_url = 'https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.DBInstanceClass.html'
tags = ['resources', 'rds']
valid_instance_types = cfnlint.helpers.load_resource(
AdditionalSpecs, 'RdsProperties.json'
)
def _get_license_model(self, engine, license_model):
"""Logic to get the correct license model"""
if not license_model:
if engine in self.valid_instance_types.get('license-included'):
license_model = 'license-included'
elif engine in self.valid_instance_types.get('bring-your-own-license'):
license_model = 'bring-your-own-license'
else:
license_model = 'general-public-license'
self.logger.debug(
'Based on Engine: %s we determined the default license will be %s',
engine,
license_model,
)
return license_model
def get_resources(self, cfn):
"""Get resources that can be checked"""
results = []
for resource_name, resource_values in cfn.get_resources(
'AWS::RDS::DBInstance'
).items():
path = ['Resources', resource_name, 'Properties']
properties = resource_values.get('Properties')
# Properties items_safe heps remove conditions and focusing on the actual values and scenarios
for prop_safe, prop_path_safe in properties.items_safe(path):
engine = prop_safe.get('Engine')
inst_class = prop_safe.get('DBInstanceClass')
license_model = prop_safe.get('LicenseModel')
# Need to get a default license model if none provided
# Also need to validate all these values are strings otherwise we cannot
# do validation
if isinstance(engine, str) and isinstance(inst_class, str):
license_model = self._get_license_model(engine, license_model)
if isinstance(license_model, str):
results.append(
{
'Engine': engine,
'DBInstanceClass': inst_class,
'Path': prop_path_safe,
'LicenseModel': license_model,
}
)
else:
self.logger.debug(
'Skip evaluation based on [LicenseModel] not being a string.'
)
else:
self.logger.debug(
'Skip evaluation based on [Engine] or [DBInstanceClass] not being strings.'
)
return results
def check_db_config(self, properties, region):
"""Check db properties"""
matches = []
db_engine = properties.get('Engine')
db_license = properties.get('LicenseModel')
db_instance_class = properties.get('DBInstanceClass')
if db_license in self.valid_instance_types:
if db_engine in self.valid_instance_types[db_license]:
if region in self.valid_instance_types[db_license][db_engine]:
if (
db_instance_class
not in self.valid_instance_types[db_license][db_engine][region]
):
message = 'DBInstanceClass "{0}" is not compatible with engine type "{1}" and LicenseModel "{2}" in region "{3}". Use instance types [{4}]'
matches.append(
RuleMatch(
properties.get('Path') + ['DBInstanceClass'],
message.format(
db_instance_class,
db_engine,
db_license,
region,
', '.join(
map(
str,
self.valid_instance_types[db_license][
db_engine
][region],
)
),
),
)
)
else:
self.logger.debug(
'Skip evaluation based on license [%s] not matching.', db_license
)
return matches
def match(self, cfn):
"""Check RDS Resource Instance Sizes"""
matches = []
resources = self.get_resources(cfn)
for resource in resources:
for region in cfn.regions:
matches.extend(self.check_db_config(resource, region))
return matches