Skip to content

Commit

Permalink
feat gsn-10845: beautify testing
Browse files Browse the repository at this point in the history
  • Loading branch information
carlovoSBP committed Jun 26, 2024
1 parent 24baefd commit e00b43e
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 117 deletions.
17 changes: 11 additions & 6 deletions awsfindingsmanagerlib/awsfindingsmanagerlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,11 @@ def _get_security_hub_client(region: str):
raise NoRegion(f'Security Hub client requires a valid region set to connect, message was:{msg}') from None
return client

def _get_security_hub_paginator_iterator(self, region: str, operation_name: str, query_filter: dict):
security_hub = self._get_security_hub_client(region=region)
paginator = security_hub.get_paginator(operation_name)
return paginator.paginate(Filters=query_filter)

@staticmethod
def _get_ec2_client(region: str):
kwargs = {}
Expand Down Expand Up @@ -712,18 +717,18 @@ def _get_findings(self, query_filter: Dict):
regions_to_retrieve = [aggregating_region] if aggregating_region else self.regions
for region in regions_to_retrieve:
self._logger.debug(f'Trying to get findings for region {region}')
security_hub = self._get_security_hub_client(region=region)
paginator = security_hub.get_paginator('get_findings')
iterator = paginator.paginate(Filters=query_filter)
iterator = self._get_security_hub_paginator_iterator(region, 'get_findings', query_filter)
try:
for page in iterator:
for finding_data in page['Findings']:
finding = Finding(finding_data)
self._logger.debug(f'Adding finding with id {finding.id}')
findings.add(finding)
except (security_hub.exceptions.InvalidAccessException, security_hub.exceptions.AccessDeniedException):
self._logger.debug(f'No access for Security Hub for region {region}.')
continue
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] in ['AccessDeniedException', 'InvalidAccessException']:
self._logger.debug(f'No access for Security Hub for region {region}.')
continue
raise error
return list(findings)

@staticmethod
Expand Down
111 changes: 0 additions & 111 deletions tests/test_awsfindingsmanagerlib.py

This file was deleted.

59 changes: 59 additions & 0 deletions tests/test_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: test_awsfindingsmanagerlib.py
#
# Copyright 2023 Marwin Baumann
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
test_awsfindingsmanagerlib
----------------------------------
Tests for `awsfindingsmanagerlib` module.
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
"""

from betamax.fixtures import unittest

__author__ = '''Marwin Baumann <[email protected]>'''
__docformat__ = '''google'''
__date__ = '''21-11-2023'''
__copyright__ = '''Copyright 2023, Marwin Baumann'''
__credits__ = ["Marwin Baumann"]
__license__ = '''Apache Software License 2.0'''
__maintainer__ = '''Marwin Baumann'''
__email__ = '''<[email protected]>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".


class TestAwsfindingsmanagerlib(unittest.BetamaxTestCase):

def setUp(self):
"""
Test set up
This is where you can setup things that you use throughout the tests. This method is called before every test.
"""
pass

def tearDown(self):
"""
Test tear down
This is where you should tear down what you've setup in setUp before. This method is called after every test.
"""
pass
68 changes: 68 additions & 0 deletions tests/test_suppressions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: test_suppressions.py
#
# Copyright 2024 Carlo van Overbeek
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
test_suppressions
----------------------------------
Tests for `awsfindingsmanagerlib` module.
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
"""

from unittest.mock import patch, MagicMock
from .utils import FindingsManager, TestCaseWithBatchUpdateFindings
from awsfindingsmanagerlib import Local
import json

__author__ = '''Carlo van Overbeek <[email protected]>'''
__docformat__ = '''google'''
__date__ = '''26-06-2024'''
__copyright__ = '''Copyright 2024, Carlo van Overbeek'''
__credits__ = ["Carlo van Overbeek"]
__license__ = '''Apache Software License 2.0'''
__maintainer__ = '''Carlo van Overbeek'''
__email__ = '''<[email protected]>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".


with open('tests/fixtures/findings.json', encoding='utf-8') as findings_file:
findings_fixture = [json.load(findings_file)]

with open('tests/fixtures/batch_update_findings.json', encoding='utf-8') as updates_file:
batch_update_findings_fixture = json.load(updates_file)

class TestBasicRun(TestCaseWithBatchUpdateFindings):

@patch('awsfindingsmanagerlib.FindingsManager._get_security_hub_paginator_iterator', lambda *_: findings_fixture)
@patch('awsfindingsmanagerlib.FindingsManager._batch_update_findings')
def test_basic_run(self, _batch_update_findings_mocked: MagicMock):
# basic init
local_backend = Local(path='./tests/fixtures/suppressions.yaml')
rules = local_backend.get_rules()

findings_manager = FindingsManager()
findings_manager.register_rules(rules)

# basic suppression in action
self.assertTrue(findings_manager.suppress_matching_findings())

# created payload validation
self.assert_batch_update_findings_called_once_with(batch_update_findings_fixture, _batch_update_findings_mocked)
81 changes: 81 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: utils.py
#
# Copyright 2024 Carlo van Overbeek
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
utils
----------------------------------
Test utils for `awsfindingsmanagerlib` module.
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
"""

from awsfindingsmanagerlib import FindingsManager as FindingsManagerToMock
from unittest.mock import MagicMock
from unittest import TestCase

__author__ = '''Carlo van Overbeek <[email protected]>'''
__docformat__ = '''google'''
__date__ = '''26-06-2024'''
__copyright__ = '''Copyright 2024, Carlo van Overbeek'''
__credits__ = ["Carlo van Overbeek"]
__license__ = '''Apache Software License 2.0'''
__maintainer__ = '''Carlo van Overbeek'''
__email__ = '''<[email protected]>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".


class FindingsManager(FindingsManagerToMock):

@staticmethod
def _get_ec2_client(region: str):
return MagicMock()

@staticmethod
def _get_security_hub_client(region: str):
return MagicMock()

@staticmethod
def _get_sts_client():
return MagicMock()


class TestCaseWithBatchUpdateFindings(TestCase):

def assert_batch_update_findings_called_once_with(self, batch_update_findings_expected: dict, _batch_update_findings_mocked: MagicMock):
"""
Compare expected to actual (=mocked) api call payload.
Sadly, something like this does not work: _batch_update_findings_mocked.assert_called_once_with(ANY, batch_update_findings),
because FindingIdentifiers is a randomly ordered collection.
"""
_batch_update_findings_mocked.assert_called_once()

received_args = _batch_update_findings_mocked.call_args.args[1]

self.assertEqual(batch_update_findings_expected.keys(), received_args.keys())

self.assertEqual(batch_update_findings_expected['Note'], received_args['Note'])
self.assertEqual(batch_update_findings_expected['Workflow'], received_args['Workflow'])

self.assertEqual(len(batch_update_findings_expected['FindingIdentifiers']), len(received_args['FindingIdentifiers']))

for item in batch_update_findings_expected['FindingIdentifiers']:
self.assertIn(item, received_args['FindingIdentifiers'])

0 comments on commit e00b43e

Please sign in to comment.